1use std::{
16 collections::hash_map::{Entry as HashMapEntry, HashMap},
17 fmt::Debug,
18 future::Future,
19 hash::Hash,
20 ops::Deref,
21 pin::Pin,
22 sync::Arc,
23 task::{Context, Poll},
24};
25
26use arc_swap::ArcSwap;
27use equivalent::Equivalent;
28#[cfg(feature = "tracing")]
29use fastrace::{
30 future::{FutureExt, InSpan},
31 Span,
32};
33use foyer_common::{
34 code::HashBuilder,
35 event::{Event, EventListener},
36 future::{Diversion, DiversionFuture},
37 metrics::Metrics,
38 properties::{Location, Properties, Source},
39 runtime::SingletonHandle,
40 strict_assert,
41 utils::scope::Scope,
42};
43use itertools::Itertools;
44use parking_lot::{Mutex, RwLock};
45use pin_project::pin_project;
46use tokio::{sync::oneshot, task::JoinHandle};
47
48use crate::{
49 error::{Error, Result},
50 eviction::{Eviction, Op},
51 indexer::{hash_table::HashTableIndexer, sentry::Sentry, Indexer},
52 pipe::NoopPipe,
53 record::{Data, Record},
54 Piece, Pipe,
55};
56
57pub trait Weighter<K, V>: Fn(&K, &V) -> usize + Send + Sync + 'static {}
61impl<K, V, T> Weighter<K, V> for T where T: Fn(&K, &V) -> usize + Send + Sync + 'static {}
62
63pub trait Filter<K, V>: Fn(&K, &V) -> bool + Send + Sync + 'static {}
74impl<K, V, T> Filter<K, V> for T where T: Fn(&K, &V) -> bool + Send + Sync + 'static {}
75
76pub struct RawCacheConfig<E, S>
77where
78 E: Eviction,
79 S: HashBuilder,
80{
81 pub capacity: usize,
82 pub shards: usize,
83 pub eviction_config: E::Config,
84 pub hash_builder: S,
85 pub weighter: Arc<dyn Weighter<E::Key, E::Value>>,
86 pub filter: Arc<dyn Filter<E::Key, E::Value>>,
87 pub event_listener: Option<Arc<dyn EventListener<Key = E::Key, Value = E::Value>>>,
88 pub metrics: Arc<Metrics>,
89}
90
91struct RawCacheShard<E, S, I>
92where
93 E: Eviction,
94 S: HashBuilder,
95 I: Indexer<Eviction = E>,
96{
97 eviction: E,
98 indexer: Sentry<I>,
99
100 usage: usize,
101 capacity: usize,
102
103 #[expect(clippy::type_complexity)]
104 waiters: Mutex<HashMap<E::Key, Vec<oneshot::Sender<RawCacheEntry<E, S, I>>>>>,
105
106 metrics: Arc<Metrics>,
107 _event_listener: Option<Arc<dyn EventListener<Key = E::Key, Value = E::Value>>>,
108}
109
110impl<E, S, I> RawCacheShard<E, S, I>
111where
112 E: Eviction,
113 S: HashBuilder,
114 I: Indexer<Eviction = E>,
115{
116 fn evict(&mut self, target: usize, garbages: &mut Vec<(Event, Arc<Record<E>>)>) {
118 while self.usage > target {
120 let evicted = match self.eviction.pop() {
121 Some(evicted) => evicted,
122 None => break,
123 };
124 self.metrics.memory_evict.increase(1);
125
126 let e = self.indexer.remove(evicted.hash(), evicted.key()).unwrap();
127 assert_eq!(Arc::as_ptr(&evicted), Arc::as_ptr(&e));
128
129 strict_assert!(!evicted.as_ref().is_in_indexer());
130 strict_assert!(!evicted.as_ref().is_in_eviction());
131
132 self.usage -= evicted.weight();
133
134 garbages.push((Event::Evict, evicted));
135 }
136 }
137
138 fn emplace(
139 &mut self,
140 record: Arc<Record<E>>,
141 garbages: &mut Vec<(Event, Arc<Record<E>>)>,
142 waiters: &mut Vec<oneshot::Sender<RawCacheEntry<E, S, I>>>,
143 ) -> Arc<Record<E>> {
144 *waiters = self.waiters.lock().remove(record.key()).unwrap_or_default();
145
146 let weight = record.weight();
147 let old_usage = self.usage;
148
149 self.evict(self.capacity.saturating_sub(weight), garbages);
151
152 if let Some(old) = self.indexer.insert(record.clone()) {
154 self.metrics.memory_replace.increase(1);
155
156 strict_assert!(!old.is_in_indexer());
157
158 if old.is_in_eviction() {
159 self.eviction.remove(&old);
160 }
161 strict_assert!(!old.is_in_eviction());
162
163 self.usage -= old.weight();
164
165 garbages.push((Event::Replace, old));
166 } else {
167 self.metrics.memory_insert.increase(1);
168 }
169 strict_assert!(record.is_in_indexer());
170
171 let ephemeral = record.properties().ephemeral().unwrap_or_default();
172 record.set_ephemeral(ephemeral);
173 if !ephemeral {
174 self.eviction.push(record.clone());
175 strict_assert!(record.is_in_eviction());
176 }
177
178 self.usage += weight;
179 record.inc_refs(waiters.len() + 1);
182
183 match self.usage.cmp(&old_usage) {
184 std::cmp::Ordering::Greater => self.metrics.memory_usage.increase((self.usage - old_usage) as _),
185 std::cmp::Ordering::Less => self.metrics.memory_usage.decrease((old_usage - self.usage) as _),
186 std::cmp::Ordering::Equal => {}
187 }
188
189 record
190 }
191
192 #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::shard::remove"))]
193 fn remove<Q>(&mut self, hash: u64, key: &Q) -> Option<Arc<Record<E>>>
194 where
195 Q: Hash + Equivalent<E::Key> + ?Sized,
196 {
197 let record = self.indexer.remove(hash, key)?;
198
199 if record.is_in_eviction() {
200 self.eviction.remove(&record);
201 }
202 strict_assert!(!record.is_in_indexer());
203 strict_assert!(!record.is_in_eviction());
204
205 self.usage -= record.weight();
206
207 self.metrics.memory_remove.increase(1);
208 self.metrics.memory_usage.decrease(record.weight() as _);
209
210 record.inc_refs(1);
211
212 Some(record)
213 }
214
215 #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::shard::get_noop"))]
216 fn get_noop<Q>(&self, hash: u64, key: &Q) -> Option<Arc<Record<E>>>
217 where
218 Q: Hash + Equivalent<E::Key> + ?Sized,
219 {
220 self.get_inner(hash, key)
221 }
222
223 #[cfg_attr(
224 feature = "tracing",
225 fastrace::trace(name = "foyer::memory::raw::shard::get_immutable")
226 )]
227 fn get_immutable<Q>(&self, hash: u64, key: &Q) -> Option<Arc<Record<E>>>
228 where
229 Q: Hash + Equivalent<E::Key> + ?Sized,
230 {
231 self.get_inner(hash, key)
232 .inspect(|record| self.acquire_immutable(record))
233 }
234
235 #[cfg_attr(
236 feature = "tracing",
237 fastrace::trace(name = "foyer::memory::raw::shard::get_mutable")
238 )]
239 fn get_mutable<Q>(&mut self, hash: u64, key: &Q) -> Option<Arc<Record<E>>>
240 where
241 Q: Hash + Equivalent<E::Key> + ?Sized,
242 {
243 self.get_inner(hash, key).inspect(|record| self.acquire_mutable(record))
244 }
245
246 #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::shard::get_inner"))]
247 fn get_inner<Q>(&self, hash: u64, key: &Q) -> Option<Arc<Record<E>>>
248 where
249 Q: Hash + Equivalent<E::Key> + ?Sized,
250 {
251 let record = match self.indexer.get(hash, key).cloned() {
252 Some(record) => {
253 self.metrics.memory_hit.increase(1);
254 record
255 }
256 None => {
257 self.metrics.memory_miss.increase(1);
258 return None;
259 }
260 };
261
262 strict_assert!(record.is_in_indexer());
263
264 record.set_ephemeral(false);
265
266 record.inc_refs(1);
267
268 Some(record)
269 }
270
271 #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::shard::clear"))]
272 fn clear(&mut self, garbages: &mut Vec<Arc<Record<E>>>) {
273 let records = self.indexer.drain().collect_vec();
274 self.eviction.clear();
275
276 let mut count = 0;
277
278 for record in records {
279 count += 1;
280 strict_assert!(!record.is_in_indexer());
281 strict_assert!(!record.is_in_eviction());
282
283 garbages.push(record);
284 }
285
286 self.metrics.memory_remove.increase(count);
287 }
288
289 #[cfg_attr(
290 feature = "tracing",
291 fastrace::trace(name = "foyer::memory::raw::shard::acquire_immutable")
292 )]
293 fn acquire_immutable(&self, record: &Arc<Record<E>>) {
294 match E::acquire() {
295 Op::Immutable(f) => f(&self.eviction, record),
296 _ => unreachable!(),
297 }
298 }
299
300 #[cfg_attr(
301 feature = "tracing",
302 fastrace::trace(name = "foyer::memory::raw::shard::acquire_mutable")
303 )]
304 fn acquire_mutable(&mut self, record: &Arc<Record<E>>) {
305 match E::acquire() {
306 Op::Mutable(mut f) => f(&mut self.eviction, record),
307 _ => unreachable!(),
308 }
309 }
310
311 #[cfg_attr(
312 feature = "tracing",
313 fastrace::trace(name = "foyer::memory::raw::shard::release_immutable")
314 )]
315 fn release_immutable(&self, record: &Arc<Record<E>>) {
316 match E::release() {
317 Op::Immutable(f) => f(&self.eviction, record),
318 _ => unreachable!(),
319 }
320 }
321
322 #[cfg_attr(
323 feature = "tracing",
324 fastrace::trace(name = "foyer::memory::raw::shard::release_mutable")
325 )]
326 fn release_mutable(&mut self, record: &Arc<Record<E>>) {
327 match E::release() {
328 Op::Mutable(mut f) => f(&mut self.eviction, record),
329 _ => unreachable!(),
330 }
331 }
332
333 #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::shard::fetch_noop"))]
334 fn fetch_noop(&self, hash: u64, key: &E::Key) -> RawShardFetch<E, S, I>
335 where
336 E::Key: Clone,
337 {
338 if let Some(record) = self.get_noop(hash, key) {
339 return RawShardFetch::Hit(record);
340 }
341
342 self.fetch_queue(key.clone())
343 }
344
345 #[cfg_attr(
346 feature = "tracing",
347 fastrace::trace(name = "foyer::memory::raw::shard::fetch_immutable")
348 )]
349 fn fetch_immutable(&self, hash: u64, key: &E::Key) -> RawShardFetch<E, S, I>
350 where
351 E::Key: Clone,
352 {
353 if let Some(record) = self.get_immutable(hash, key) {
354 return RawShardFetch::Hit(record);
355 }
356
357 self.fetch_queue(key.clone())
358 }
359
360 #[cfg_attr(
361 feature = "tracing",
362 fastrace::trace(name = "foyer::memory::raw::shard::fetch_mutable")
363 )]
364 fn fetch_mutable(&mut self, hash: u64, key: &E::Key) -> RawShardFetch<E, S, I>
365 where
366 E::Key: Clone,
367 {
368 if let Some(record) = self.get_mutable(hash, key) {
369 return RawShardFetch::Hit(record);
370 }
371
372 self.fetch_queue(key.clone())
373 }
374
375 #[cfg_attr(
376 feature = "tracing",
377 fastrace::trace(name = "foyer::memory::raw::shard::fetch_queue")
378 )]
379 fn fetch_queue(&self, key: E::Key) -> RawShardFetch<E, S, I> {
380 match self.waiters.lock().entry(key) {
381 HashMapEntry::Occupied(mut o) => {
382 let (tx, rx) = oneshot::channel();
383 o.get_mut().push(tx);
384 self.metrics.memory_queue.increase(1);
385 #[cfg(feature = "tracing")]
386 let wait = rx.in_span(Span::enter_with_local_parent(
387 "foyer::memory::raw::fetch_with_runtime::wait",
388 ));
389 #[cfg(not(feature = "tracing"))]
390 let wait = rx;
391 RawShardFetch::Wait(wait)
392 }
393 HashMapEntry::Vacant(v) => {
394 v.insert(vec![]);
395 self.metrics.memory_fetch.increase(1);
396 RawShardFetch::Miss
397 }
398 }
399 }
400}
401
402#[expect(clippy::type_complexity)]
403struct RawCacheInner<E, S, I>
404where
405 E: Eviction,
406 S: HashBuilder,
407 I: Indexer<Eviction = E>,
408{
409 shards: Vec<RwLock<RawCacheShard<E, S, I>>>,
410
411 capacity: usize,
412
413 hash_builder: Arc<S>,
414 weighter: Arc<dyn Weighter<E::Key, E::Value>>,
415 filter: Arc<dyn Filter<E::Key, E::Value>>,
416
417 metrics: Arc<Metrics>,
418 event_listener: Option<Arc<dyn EventListener<Key = E::Key, Value = E::Value>>>,
419 pipe: ArcSwap<Box<dyn Pipe<Key = E::Key, Value = E::Value, Properties = E::Properties>>>,
420}
421
422impl<E, S, I> RawCacheInner<E, S, I>
423where
424 E: Eviction,
425 S: HashBuilder,
426 I: Indexer<Eviction = E>,
427{
428 #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::inner::clear"))]
429 fn clear(&self) {
430 let mut garbages = vec![];
431
432 self.shards
433 .iter()
434 .map(|shard| shard.write())
435 .for_each(|mut shard| shard.clear(&mut garbages));
436
437 if let Some(listener) = self.event_listener.as_ref() {
439 for record in garbages {
440 listener.on_leave(Event::Clear, record.key(), record.value());
441 }
442 }
443 }
444}
445
446pub struct RawCache<E, S, I = HashTableIndexer<E>>
447where
448 E: Eviction,
449 S: HashBuilder,
450 I: Indexer<Eviction = E>,
451{
452 inner: Arc<RawCacheInner<E, S, I>>,
453}
454
455impl<E, S, I> Drop for RawCacheInner<E, S, I>
456where
457 E: Eviction,
458 S: HashBuilder,
459 I: Indexer<Eviction = E>,
460{
461 fn drop(&mut self) {
462 self.clear();
463 }
464}
465
466impl<E, S, I> Clone for RawCache<E, S, I>
467where
468 E: Eviction,
469 S: HashBuilder,
470 I: Indexer<Eviction = E>,
471{
472 fn clone(&self) -> Self {
473 Self {
474 inner: self.inner.clone(),
475 }
476 }
477}
478
479impl<E, S, I> RawCache<E, S, I>
480where
481 E: Eviction,
482 S: HashBuilder,
483 I: Indexer<Eviction = E>,
484{
485 pub fn new(config: RawCacheConfig<E, S>) -> Self {
486 let shard_capacity = config.capacity / config.shards;
487
488 let shards = (0..config.shards)
489 .map(|_| RawCacheShard {
490 eviction: E::new(shard_capacity, &config.eviction_config),
491 indexer: Sentry::default(),
492 usage: 0,
493 capacity: shard_capacity,
494 waiters: Mutex::default(),
495 metrics: config.metrics.clone(),
496 _event_listener: config.event_listener.clone(),
497 })
498 .map(RwLock::new)
499 .collect_vec();
500
501 let pipe: Box<dyn Pipe<Key = E::Key, Value = E::Value, Properties = E::Properties>> =
502 Box::new(NoopPipe::default());
503
504 let inner = RawCacheInner {
505 shards,
506 capacity: config.capacity,
507 hash_builder: Arc::new(config.hash_builder),
508 weighter: config.weighter,
509 filter: config.filter,
510 metrics: config.metrics,
511 event_listener: config.event_listener,
512 pipe: ArcSwap::new(Arc::new(pipe)),
513 };
514
515 Self { inner: Arc::new(inner) }
516 }
517
518 #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::resize"))]
519 pub fn resize(&self, capacity: usize) -> Result<()> {
520 let shards = self.inner.shards.len();
521 let shard_capacity = capacity / shards;
522
523 let handles = (0..shards)
524 .map(|i| {
525 let inner = self.inner.clone();
526 std::thread::spawn(move || {
527 let mut garbages = vec![];
528 let res = inner.shards[i].write().with(|mut shard| {
529 shard.eviction.update(shard_capacity, None).inspect(|_| {
530 shard.capacity = shard_capacity;
531 shard.evict(shard_capacity, &mut garbages)
532 })
533 });
534 let pipe = inner.pipe.load();
536 let piped = pipe.is_enabled();
537 if inner.event_listener.is_some() || piped {
538 for (event, record) in garbages {
539 if let Some(listener) = inner.event_listener.as_ref() {
540 listener.on_leave(event, record.key(), record.value())
541 }
542 if piped && event == Event::Evict {
543 pipe.send(Piece::new(record));
544 }
545 }
546 }
547 res
548 })
549 })
550 .collect_vec();
551
552 let errs = handles
553 .into_iter()
554 .map(|handle| handle.join().unwrap())
555 .filter(|res| res.is_err())
556 .map(|res| res.unwrap_err())
557 .collect_vec();
558 if !errs.is_empty() {
559 return Err(Error::multiple(errs));
560 }
561
562 Ok(())
563 }
564
565 #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::insert"))]
566 pub fn insert(&self, key: E::Key, value: E::Value) -> RawCacheEntry<E, S, I> {
567 self.insert_with_properties(key, value, Default::default())
568 }
569
570 #[cfg_attr(
571 feature = "tracing",
572 fastrace::trace(name = "foyer::memory::raw::insert_with_properties")
573 )]
574 pub fn insert_with_properties(
575 &self,
576 key: E::Key,
577 value: E::Value,
578 mut properties: E::Properties,
579 ) -> RawCacheEntry<E, S, I> {
580 let hash = self.inner.hash_builder.hash_one(&key);
581 let weight = (self.inner.weighter)(&key, &value);
582 if !(self.inner.filter)(&key, &value) {
583 properties = properties.with_disposable(true);
584 }
585 let record = Arc::new(Record::new(Data {
586 key,
587 value,
588 properties,
589 hash,
590 weight,
591 }));
592 self.insert_inner(record)
593 }
594
595 #[doc(hidden)]
596 #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::insert_piece"))]
597 pub fn insert_piece(&self, piece: Piece<E::Key, E::Value, E::Properties>) -> RawCacheEntry<E, S, I> {
598 self.insert_inner(piece.into_record())
599 }
600
601 #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::insert_inner"))]
602 fn insert_inner(&self, record: Arc<Record<E>>) -> RawCacheEntry<E, S, I> {
603 if record.properties().disposable().unwrap_or_default() {
604 self.inner.shards[self.shard(record.hash())]
606 .write()
607 .remove(record.hash(), record.key())
608 .inspect(|r| {
609 if let Some(listener) = self.inner.event_listener.as_ref() {
611 listener.on_leave(Event::Replace, r.key(), r.value());
612 }
613 });
614
615 record.inc_refs(1);
618 return RawCacheEntry {
619 record,
620 inner: self.inner.clone(),
621 };
622 }
623
624 let mut garbages = vec![];
625 let mut waiters = vec![];
626
627 let record = self.inner.shards[self.shard(record.hash())]
628 .write()
629 .with(|mut shard| shard.emplace(record, &mut garbages, &mut waiters));
630
631 for waiter in waiters {
633 let _ = waiter.send(RawCacheEntry {
634 record: record.clone(),
635 inner: self.inner.clone(),
636 });
637 }
638
639 let pipe = self.inner.pipe.load();
641 let piped = pipe.is_enabled();
642 if self.inner.event_listener.is_some() || piped {
643 for (event, record) in garbages {
644 if let Some(listener) = self.inner.event_listener.as_ref() {
645 listener.on_leave(event, record.key(), record.value())
646 }
647 if piped && event == Event::Evict {
648 pipe.send(Piece::new(record));
649 }
650 }
651 }
652
653 RawCacheEntry {
654 record,
655 inner: self.inner.clone(),
656 }
657 }
658
659 #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::evict_all"))]
661 pub fn evict_all(&self) {
662 let mut garbages = vec![];
663 for shard in self.inner.shards.iter() {
664 shard.write().evict(0, &mut garbages);
665 }
666
667 let pipe = self.inner.pipe.load();
669 let piped = pipe.is_enabled();
670 if self.inner.event_listener.is_some() || piped {
671 for (event, record) in garbages {
672 if let Some(listener) = self.inner.event_listener.as_ref() {
673 listener.on_leave(event, record.key(), record.value())
674 }
675 if piped && event == Event::Evict {
676 pipe.send(Piece::new(record));
677 }
678 }
679 }
680 }
681
682 #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::flush"))]
687 pub async fn flush(&self) {
688 let mut garbages = vec![];
689 for shard in self.inner.shards.iter() {
690 shard.write().evict(0, &mut garbages);
691 }
692
693 let pipe = self.inner.pipe.load();
695 let piped = pipe.is_enabled();
696
697 if let Some(listener) = self.inner.event_listener.as_ref() {
698 for (event, record) in garbages.iter() {
699 listener.on_leave(*event, record.key(), record.value());
700 }
701 }
702 if piped {
703 let pieces = garbages.into_iter().map(|(_, record)| Piece::new(record)).collect_vec();
704 pipe.flush(pieces).await;
705 }
706 }
707
708 #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::remove"))]
709 pub fn remove<Q>(&self, key: &Q) -> Option<RawCacheEntry<E, S, I>>
710 where
711 Q: Hash + Equivalent<E::Key> + ?Sized,
712 {
713 let hash = self.inner.hash_builder.hash_one(key);
714
715 self.inner.shards[self.shard(hash)]
716 .write()
717 .with(|mut shard| {
718 shard.remove(hash, key).map(|record| RawCacheEntry {
719 inner: self.inner.clone(),
720 record,
721 })
722 })
723 .inspect(|record| {
724 if let Some(listener) = self.inner.event_listener.as_ref() {
726 listener.on_leave(Event::Remove, record.key(), record.value());
727 }
728 })
729 }
730
731 #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::get"))]
732 pub fn get<Q>(&self, key: &Q) -> Option<RawCacheEntry<E, S, I>>
733 where
734 Q: Hash + Equivalent<E::Key> + ?Sized,
735 {
736 let hash = self.inner.hash_builder.hash_one(key);
737
738 let record = match E::acquire() {
739 Op::Noop => self.inner.shards[self.shard(hash)].read().get_noop(hash, key),
740 Op::Immutable(_) => self.inner.shards[self.shard(hash)]
741 .read()
742 .with(|shard| shard.get_immutable(hash, key)),
743 Op::Mutable(_) => self.inner.shards[self.shard(hash)]
744 .write()
745 .with(|mut shard| shard.get_mutable(hash, key)),
746 }?;
747
748 Some(RawCacheEntry {
749 inner: self.inner.clone(),
750 record,
751 })
752 }
753
754 #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::contains"))]
755 pub fn contains<Q>(&self, key: &Q) -> bool
756 where
757 Q: Hash + Equivalent<E::Key> + ?Sized,
758 {
759 let hash = self.inner.hash_builder.hash_one(key);
760
761 self.inner.shards[self.shard(hash)]
762 .read()
763 .with(|shard| shard.indexer.get(hash, key).is_some())
764 }
765
766 #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::touch"))]
767 pub fn touch<Q>(&self, key: &Q) -> bool
768 where
769 Q: Hash + Equivalent<E::Key> + ?Sized,
770 {
771 let hash = self.inner.hash_builder.hash_one(key);
772
773 match E::acquire() {
774 Op::Noop => self.inner.shards[self.shard(hash)].read().get_noop(hash, key),
775 Op::Immutable(_) => self.inner.shards[self.shard(hash)]
776 .read()
777 .with(|shard| shard.get_immutable(hash, key)),
778 Op::Mutable(_) => self.inner.shards[self.shard(hash)]
779 .write()
780 .with(|mut shard| shard.get_mutable(hash, key)),
781 }
782 .is_some()
783 }
784
785 #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::clear"))]
786 pub fn clear(&self) {
787 self.inner.clear();
788 }
789
790 pub fn capacity(&self) -> usize {
791 self.inner.capacity
792 }
793
794 pub fn usage(&self) -> usize {
795 self.inner.shards.iter().map(|shard| shard.read().usage).sum()
796 }
797
798 pub fn metrics(&self) -> &Metrics {
799 &self.inner.metrics
800 }
801
802 pub fn hash_builder(&self) -> &Arc<S> {
803 &self.inner.hash_builder
804 }
805
806 pub fn shards(&self) -> usize {
807 self.inner.shards.len()
808 }
809
810 pub fn set_pipe(&self, pipe: Box<dyn Pipe<Key = E::Key, Value = E::Value, Properties = E::Properties>>) {
811 self.inner.pipe.store(Arc::new(pipe));
812 }
813
814 fn shard(&self, hash: u64) -> usize {
815 hash as usize % self.inner.shards.len()
816 }
817}
818
819pub struct RawCacheEntry<E, S, I = HashTableIndexer<E>>
820where
821 E: Eviction,
822 S: HashBuilder,
823 I: Indexer<Eviction = E>,
824{
825 inner: Arc<RawCacheInner<E, S, I>>,
826 record: Arc<Record<E>>,
827}
828
829impl<E, S, I> Debug for RawCacheEntry<E, S, I>
830where
831 E: Eviction,
832 S: HashBuilder,
833 I: Indexer<Eviction = E>,
834{
835 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
836 f.debug_struct("RawCacheEntry").field("record", &self.record).finish()
837 }
838}
839
840impl<E, S, I> Drop for RawCacheEntry<E, S, I>
841where
842 E: Eviction,
843 S: HashBuilder,
844 I: Indexer<Eviction = E>,
845{
846 fn drop(&mut self) {
847 let hash = self.record.hash();
848 let shard = &self.inner.shards[hash as usize % self.inner.shards.len()];
849
850 if self.record.dec_refs(1) == 0 {
851 if self.record.properties().disposable().unwrap_or_default() {
852 return;
854 }
855
856 match E::release() {
857 Op::Noop => {}
858 Op::Immutable(_) => shard.read().with(|shard| shard.release_immutable(&self.record)),
859 Op::Mutable(_) => shard.write().with(|mut shard| shard.release_mutable(&self.record)),
860 }
861
862 if self.record.is_ephemeral() {
863 shard
864 .write()
865 .with(|mut shard| shard.remove(hash, self.key()))
866 .inspect(|record| {
867 let pipe = self.inner.pipe.load();
869 let piped = pipe.is_enabled();
870 let event = Event::Evict;
871 if self.inner.event_listener.is_some() || piped {
872 if let Some(listener) = self.inner.event_listener.as_ref() {
873 listener.on_leave(Event::Evict, record.key(), record.value());
874 }
875 }
876 if piped && event == Event::Evict {
877 pipe.send(self.piece());
878 }
879 });
880 }
881 }
882 }
883}
884
885impl<E, S, I> Clone for RawCacheEntry<E, S, I>
886where
887 E: Eviction,
888 S: HashBuilder,
889 I: Indexer<Eviction = E>,
890{
891 fn clone(&self) -> Self {
892 self.record.inc_refs(1);
893 Self {
894 inner: self.inner.clone(),
895 record: self.record.clone(),
896 }
897 }
898}
899
900impl<E, S, I> Deref for RawCacheEntry<E, S, I>
901where
902 E: Eviction,
903 S: HashBuilder,
904 I: Indexer<Eviction = E>,
905{
906 type Target = E::Value;
907
908 fn deref(&self) -> &Self::Target {
909 self.value()
910 }
911}
912
913unsafe impl<E, S, I> Send for RawCacheEntry<E, S, I>
914where
915 E: Eviction,
916 S: HashBuilder,
917 I: Indexer<Eviction = E>,
918{
919}
920
921unsafe impl<E, S, I> Sync for RawCacheEntry<E, S, I>
922where
923 E: Eviction,
924 S: HashBuilder,
925 I: Indexer<Eviction = E>,
926{
927}
928
929impl<E, S, I> RawCacheEntry<E, S, I>
930where
931 E: Eviction,
932 S: HashBuilder,
933 I: Indexer<Eviction = E>,
934{
935 pub fn hash(&self) -> u64 {
936 self.record.hash()
937 }
938
939 pub fn key(&self) -> &E::Key {
940 self.record.key()
941 }
942
943 pub fn value(&self) -> &E::Value {
944 self.record.value()
945 }
946
947 pub fn properties(&self) -> &E::Properties {
948 self.record.properties()
949 }
950
951 pub fn weight(&self) -> usize {
952 self.record.weight()
953 }
954
955 pub fn refs(&self) -> usize {
956 self.record.refs()
957 }
958
959 pub fn is_outdated(&self) -> bool {
960 !self.record.is_in_indexer()
961 }
962
963 pub fn piece(&self) -> Piece<E::Key, E::Value, E::Properties> {
964 Piece::new(self.record.clone())
965 }
966}
967
968#[derive(Debug, Clone, Copy, PartialEq, Eq)]
970pub enum FetchState {
971 Hit,
973 Wait,
975 Miss,
977}
978
979#[derive(Debug)]
981pub struct FetchContext {
982 pub throttled: bool,
984 pub source: Source,
986}
987
988enum RawShardFetch<E, S, I>
989where
990 E: Eviction,
991 S: HashBuilder,
992 I: Indexer<Eviction = E>,
993{
994 Hit(Arc<Record<E>>),
995 Wait(RawFetchWait<E, S, I>),
996 Miss,
997}
998
999pub type RawFetch<E, ER, S, I = HashTableIndexer<E>> =
1000 DiversionFuture<RawFetchInner<E, ER, S, I>, std::result::Result<RawCacheEntry<E, S, I>, ER>, FetchContext>;
1001
1002type RawFetchHit<E, S, I> = Option<RawCacheEntry<E, S, I>>;
1003#[cfg(feature = "tracing")]
1004type RawFetchWait<E, S, I> = InSpan<oneshot::Receiver<RawCacheEntry<E, S, I>>>;
1005#[cfg(not(feature = "tracing"))]
1006type RawFetchWait<E, S, I> = oneshot::Receiver<RawCacheEntry<E, S, I>>;
1007type RawFetchMiss<E, I, S, ER, DFS> = JoinHandle<Diversion<std::result::Result<RawCacheEntry<E, S, I>, ER>, DFS>>;
1008
1009pub enum FetchTarget<K, V, P> {
1011 Value(V),
1013 Piece(Piece<K, V, P>),
1015}
1016
1017impl<K, V, P> Debug for FetchTarget<K, V, P> {
1018 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1019 f.debug_struct("FetchTarget").finish()
1020 }
1021}
1022
1023impl<K, V, P> From<V> for FetchTarget<K, V, P> {
1024 fn from(value: V) -> Self {
1025 Self::Value(value)
1026 }
1027}
1028
1029impl<K, V, P> From<Piece<K, V, P>> for FetchTarget<K, V, P> {
1030 fn from(piece: Piece<K, V, P>) -> Self {
1031 Self::Piece(piece)
1032 }
1033}
1034
1035#[pin_project(project = RawFetchInnerProj)]
1036pub enum RawFetchInner<E, ER, S, I>
1037where
1038 E: Eviction,
1039 S: HashBuilder,
1040 I: Indexer<Eviction = E>,
1041{
1042 Hit(RawFetchHit<E, S, I>),
1043 Wait(#[pin] RawFetchWait<E, S, I>),
1044 Miss(#[pin] RawFetchMiss<E, I, S, ER, FetchContext>),
1045}
1046
1047impl<E, ER, S, I> RawFetchInner<E, ER, S, I>
1048where
1049 E: Eviction,
1050 S: HashBuilder,
1051 I: Indexer<Eviction = E>,
1052{
1053 pub fn state(&self) -> FetchState {
1054 match self {
1055 RawFetchInner::Hit(_) => FetchState::Hit,
1056 RawFetchInner::Wait(_) => FetchState::Wait,
1057 RawFetchInner::Miss(_) => FetchState::Miss,
1058 }
1059 }
1060}
1061
1062impl<E, ER, S, I> Future for RawFetchInner<E, ER, S, I>
1063where
1064 E: Eviction,
1065 ER: From<Error>,
1066 S: HashBuilder,
1067 I: Indexer<Eviction = E>,
1068{
1069 type Output = Diversion<std::result::Result<RawCacheEntry<E, S, I>, ER>, FetchContext>;
1070
1071 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1072 match self.project() {
1073 RawFetchInnerProj::Hit(opt) => Poll::Ready(Ok(opt.take().unwrap()).into()),
1074 RawFetchInnerProj::Wait(waiter) => waiter.poll(cx).map_err(|e| Error::wait(e).into()).map(Diversion::from),
1075 RawFetchInnerProj::Miss(handle) => handle.poll(cx).map(|join| join.unwrap()),
1076 }
1077 }
1078}
1079
1080impl<E, S, I> RawCache<E, S, I>
1081where
1082 E: Eviction,
1083 S: HashBuilder,
1084 I: Indexer<Eviction = E>,
1085 E::Key: Clone,
1086{
1087 #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::fetch"))]
1088 pub fn fetch<F, FU, ER>(&self, key: E::Key, fetch: F) -> RawFetch<E, ER, S, I>
1089 where
1090 F: FnOnce() -> FU,
1091 FU: Future<Output = std::result::Result<E::Value, ER>> + Send + 'static,
1092 ER: Send + 'static + Debug,
1093 {
1094 self.fetch_inner(
1095 key,
1096 Default::default(),
1097 fetch,
1098 &tokio::runtime::Handle::current().into(),
1099 )
1100 }
1101
1102 #[cfg_attr(
1103 feature = "tracing",
1104 fastrace::trace(name = "foyer::memory::raw::fetch_with_properties")
1105 )]
1106 pub fn fetch_with_properties<F, FU, ER, ID>(
1107 &self,
1108 key: E::Key,
1109 properties: E::Properties,
1110 fetch: F,
1111 ) -> RawFetch<E, ER, S, I>
1112 where
1113 F: FnOnce() -> FU,
1114 FU: Future<Output = ID> + Send + 'static,
1115 ER: Send + 'static + Debug,
1116 ID: Into<Diversion<std::result::Result<E::Value, ER>, FetchContext>>,
1117 {
1118 self.fetch_inner(key, properties, fetch, &tokio::runtime::Handle::current().into())
1119 }
1120
1121 #[doc(hidden)]
1125 #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::fetch_inner"))]
1126 pub fn fetch_inner<F, FU, ER, ID, IT>(
1127 &self,
1128 key: E::Key,
1129 mut properties: E::Properties,
1130 fetch: F,
1131 runtime: &SingletonHandle,
1132 ) -> RawFetch<E, ER, S, I>
1133 where
1134 F: FnOnce() -> FU,
1135 FU: Future<Output = ID> + Send + 'static,
1136 ER: Send + 'static + Debug,
1137 ID: Into<Diversion<std::result::Result<IT, ER>, FetchContext>>,
1138 IT: Into<FetchTarget<E::Key, E::Value, E::Properties>>,
1139 {
1140 let hash = self.inner.hash_builder.hash_one(&key);
1141
1142 let raw = match E::acquire() {
1143 Op::Noop => self.inner.shards[self.shard(hash)].read().fetch_noop(hash, &key),
1144 Op::Immutable(_) => self.inner.shards[self.shard(hash)].read().fetch_immutable(hash, &key),
1145 Op::Mutable(_) => self.inner.shards[self.shard(hash)].write().fetch_mutable(hash, &key),
1146 };
1147
1148 match raw {
1149 RawShardFetch::Hit(record) => {
1150 return RawFetch::new(RawFetchInner::Hit(Some(RawCacheEntry {
1151 record,
1152 inner: self.inner.clone(),
1153 })))
1154 }
1155 RawShardFetch::Wait(future) => return RawFetch::new(RawFetchInner::Wait(future)),
1156 RawShardFetch::Miss => {}
1157 }
1158
1159 let cache = self.clone();
1160 let future = fetch();
1161 let join = runtime.spawn({
1162 let task = async move {
1163 #[cfg(feature = "tracing")]
1164 let Diversion { target, store } = future
1165 .in_span(Span::enter_with_local_parent("foyer::memory::raw::fetch_inner::fn"))
1166 .await
1167 .into();
1168 #[cfg(not(feature = "tracing"))]
1169 let Diversion { target, store } = future.await.into();
1170
1171 let target = match target {
1172 Ok(value) => value,
1173 Err(e) => {
1174 cache.inner.shards[cache.shard(hash)].read().waiters.lock().remove(&key);
1175 tracing::debug!("[fetch]: error raise while fetching, all waiter are dropped, err: {e:?}");
1176 return Diversion { target: Err(e), store };
1177 }
1178 };
1179 if let Some(ctx) = store.as_ref() {
1180 if ctx.throttled {
1181 properties = properties.with_location(Location::InMem)
1182 }
1183 properties = properties.with_source(ctx.source)
1184 };
1185 let location = properties.location().unwrap_or_default();
1186 properties = properties.with_ephemeral(matches! {location, Location::OnDisk});
1187 let entry = match target.into() {
1188 FetchTarget::Value(value) => cache.insert_with_properties(key, value, properties),
1189 FetchTarget::Piece(p) => cache.insert_inner(p.into_record::<E>()),
1190 };
1191 Diversion {
1192 target: Ok(entry),
1193 store,
1194 }
1195 };
1196 #[cfg(feature = "tracing")]
1197 let task = task.in_span(Span::enter_with_local_parent(
1198 "foyer::memory::generic::fetch_with_runtime::spawn",
1199 ));
1200 task
1201 });
1202
1203 RawFetch::new(RawFetchInner::Miss(join))
1204 }
1205}
1206
1207#[cfg(test)]
1208mod tests {
1209 use foyer_common::hasher::ModHasher;
1210 use rand::{rngs::SmallRng, seq::IndexedRandom, RngCore, SeedableRng};
1211
1212 use super::*;
1213 use crate::{
1214 eviction::{
1215 fifo::{Fifo, FifoConfig},
1216 lfu::{Lfu, LfuConfig},
1217 lru::{Lru, LruConfig},
1218 s3fifo::{S3Fifo, S3FifoConfig},
1219 sieve::{Sieve, SieveConfig},
1220 test_utils::TestProperties,
1221 },
1222 test_utils::PiecePipe,
1223 };
1224
1225 fn is_send_sync_static<T: Send + Sync + 'static>() {}
1226
1227 #[test]
1228 fn test_send_sync_static() {
1229 is_send_sync_static::<RawCache<Fifo<(), (), TestProperties>, ModHasher>>();
1230 is_send_sync_static::<RawCache<S3Fifo<(), (), TestProperties>, ModHasher>>();
1231 is_send_sync_static::<RawCache<Lfu<(), (), TestProperties>, ModHasher>>();
1232 is_send_sync_static::<RawCache<Lru<(), (), TestProperties>, ModHasher>>();
1233 is_send_sync_static::<RawCache<Sieve<(), (), TestProperties>, ModHasher>>();
1234 }
1235
1236 #[expect(clippy::type_complexity)]
1237 fn fifo_cache_for_test(
1238 ) -> RawCache<Fifo<u64, u64, TestProperties>, ModHasher, HashTableIndexer<Fifo<u64, u64, TestProperties>>> {
1239 RawCache::new(RawCacheConfig {
1240 capacity: 256,
1241 shards: 4,
1242 eviction_config: FifoConfig::default(),
1243 hash_builder: Default::default(),
1244 weighter: Arc::new(|_, _| 1),
1245 filter: Arc::new(|_, _| true),
1246 event_listener: None,
1247 metrics: Arc::new(Metrics::noop()),
1248 })
1249 }
1250
1251 #[expect(clippy::type_complexity)]
1252 fn s3fifo_cache_for_test(
1253 ) -> RawCache<S3Fifo<u64, u64, TestProperties>, ModHasher, HashTableIndexer<S3Fifo<u64, u64, TestProperties>>> {
1254 RawCache::new(RawCacheConfig {
1255 capacity: 256,
1256 shards: 4,
1257 eviction_config: S3FifoConfig::default(),
1258 hash_builder: Default::default(),
1259 weighter: Arc::new(|_, _| 1),
1260 filter: Arc::new(|_, _| true),
1261 event_listener: None,
1262 metrics: Arc::new(Metrics::noop()),
1263 })
1264 }
1265
1266 #[expect(clippy::type_complexity)]
1267 fn lru_cache_for_test(
1268 ) -> RawCache<Lru<u64, u64, TestProperties>, ModHasher, HashTableIndexer<Lru<u64, u64, TestProperties>>> {
1269 RawCache::new(RawCacheConfig {
1270 capacity: 256,
1271 shards: 4,
1272 eviction_config: LruConfig::default(),
1273 hash_builder: Default::default(),
1274 weighter: Arc::new(|_, _| 1),
1275 filter: Arc::new(|_, _| true),
1276 event_listener: None,
1277 metrics: Arc::new(Metrics::noop()),
1278 })
1279 }
1280
1281 #[expect(clippy::type_complexity)]
1282 fn lfu_cache_for_test(
1283 ) -> RawCache<Lfu<u64, u64, TestProperties>, ModHasher, HashTableIndexer<Lfu<u64, u64, TestProperties>>> {
1284 RawCache::new(RawCacheConfig {
1285 capacity: 256,
1286 shards: 4,
1287 eviction_config: LfuConfig::default(),
1288 hash_builder: Default::default(),
1289 weighter: Arc::new(|_, _| 1),
1290 filter: Arc::new(|_, _| true),
1291 event_listener: None,
1292 metrics: Arc::new(Metrics::noop()),
1293 })
1294 }
1295
1296 #[expect(clippy::type_complexity)]
1297 fn sieve_cache_for_test(
1298 ) -> RawCache<Sieve<u64, u64, TestProperties>, ModHasher, HashTableIndexer<Sieve<u64, u64, TestProperties>>> {
1299 RawCache::new(RawCacheConfig {
1300 capacity: 256,
1301 shards: 4,
1302 eviction_config: SieveConfig {},
1303 hash_builder: Default::default(),
1304 weighter: Arc::new(|_, _| 1),
1305 filter: Arc::new(|_, _| true),
1306 event_listener: None,
1307 metrics: Arc::new(Metrics::noop()),
1308 })
1309 }
1310
1311 #[test_log::test]
1312 fn test_insert_ephemeral() {
1313 let fifo = fifo_cache_for_test();
1314
1315 let e1 = fifo.insert_with_properties(1, 1, TestProperties::default().with_ephemeral(true));
1316 assert_eq!(fifo.usage(), 1);
1317 drop(e1);
1318 assert_eq!(fifo.usage(), 0);
1319
1320 let e2a = fifo.insert_with_properties(2, 2, TestProperties::default().with_ephemeral(true));
1321 assert_eq!(fifo.usage(), 1);
1322 let e2b = fifo.get(&2).expect("entry 2 should exist");
1323 drop(e2a);
1324 assert_eq!(fifo.usage(), 1);
1325 drop(e2b);
1326 assert_eq!(fifo.usage(), 1);
1327 }
1328
1329 #[test_log::test]
1330 fn test_insert_disposable() {
1331 let fifo = fifo_cache_for_test();
1332
1333 let e1 = fifo.insert_with_properties(1, 1, TestProperties::default().with_disposable(true));
1334 assert_eq!(fifo.usage(), 0);
1335 drop(e1);
1336 assert_eq!(fifo.usage(), 0);
1337
1338 let e2a = fifo.insert_with_properties(2, 2, TestProperties::default().with_disposable(true));
1339 assert_eq!(fifo.usage(), 0);
1340 assert!(fifo.get(&2).is_none());
1341 assert_eq!(fifo.usage(), 0);
1342 drop(e2a);
1343 assert_eq!(fifo.usage(), 0);
1344
1345 let fifo = fifo_cache_for_test();
1346 fifo.insert(1, 1);
1347 assert_eq!(fifo.usage(), 1);
1348 assert_eq!(fifo.get(&1).unwrap().value(), &1);
1349 let e2 = fifo.insert_with_properties(1, 100, TestProperties::default().with_disposable(true));
1350 assert_eq!(fifo.usage(), 0);
1351 drop(e2);
1352 assert_eq!(fifo.usage(), 0);
1353 assert!(fifo.get(&1).is_none());
1354 }
1355
1356 #[expect(clippy::type_complexity)]
1357 #[test_log::test]
1358 fn test_insert_filter() {
1359 let fifo: RawCache<
1360 Fifo<u64, u64, TestProperties>,
1361 ModHasher,
1362 HashTableIndexer<Fifo<u64, u64, TestProperties>>,
1363 > = RawCache::new(RawCacheConfig {
1364 capacity: 256,
1365 shards: 4,
1366 eviction_config: FifoConfig::default(),
1367 hash_builder: Default::default(),
1368 weighter: Arc::new(|_, _| 1),
1369 filter: Arc::new(|k, _| !matches!(*k, 42)),
1370 event_listener: None,
1371 metrics: Arc::new(Metrics::noop()),
1372 });
1373
1374 fifo.insert(1, 1);
1375 fifo.insert(2, 2);
1376 fifo.insert(42, 42);
1377 assert_eq!(fifo.usage(), 2);
1378 assert_eq!(fifo.get(&1).unwrap().value(), &1);
1379 assert_eq!(fifo.get(&2).unwrap().value(), &2);
1380 assert!(fifo.get(&42).is_none());
1381 }
1382
1383 #[test]
1384 fn test_evict_all() {
1385 let pipe = Box::new(PiecePipe::default());
1386
1387 let fifo = fifo_cache_for_test();
1388 fifo.set_pipe(pipe.clone());
1389 for i in 0..fifo.capacity() as _ {
1390 fifo.insert(i, i);
1391 }
1392 assert_eq!(fifo.usage(), fifo.capacity());
1393
1394 fifo.evict_all();
1395 let mut pieces = pipe
1396 .pieces()
1397 .iter()
1398 .map(|p| (p.hash(), *p.key(), *p.value()))
1399 .collect_vec();
1400 pieces.sort_by_key(|t| t.0);
1401 let expected = (0..fifo.capacity() as u64).map(|i| (i, i, i)).collect_vec();
1402 assert_eq!(pieces, expected);
1403 }
1404
1405 #[test]
1406 fn test_insert_size_over_capacity() {
1407 let cache: RawCache<Fifo<Vec<u8>, Vec<u8>, TestProperties>, ModHasher> = RawCache::new(RawCacheConfig {
1408 capacity: 4 * 1024, shards: 1,
1410 eviction_config: FifoConfig::default(),
1411 hash_builder: Default::default(),
1412 weighter: Arc::new(|k, v| k.len() + v.len()),
1413 filter: Arc::new(|_, _| true),
1414 event_listener: None,
1415 metrics: Arc::new(Metrics::noop()),
1416 });
1417
1418 let key = vec![b'k'; 1024]; let value = vec![b'v'; 5 * 1024]; cache.insert(key.clone(), value.clone());
1422 assert_eq!(cache.usage(), 6 * 1024);
1423 assert_eq!(cache.get(&key).unwrap().value(), &value);
1424 }
1425
1426 fn test_resize<E>(cache: &RawCache<E, ModHasher, HashTableIndexer<E>>)
1427 where
1428 E: Eviction<Key = u64, Value = u64>,
1429 {
1430 let capacity = cache.capacity();
1431 for i in 0..capacity as u64 * 2 {
1432 cache.insert(i, i);
1433 }
1434 assert_eq!(cache.usage(), capacity);
1435 cache.resize(capacity / 2).unwrap();
1436 assert_eq!(cache.usage(), capacity / 2);
1437 for i in 0..capacity as u64 * 2 {
1438 cache.insert(i, i);
1439 }
1440 assert_eq!(cache.usage(), capacity / 2);
1441 }
1442
1443 #[test]
1444 fn test_fifo_cache_resize() {
1445 let cache = fifo_cache_for_test();
1446 test_resize(&cache);
1447 }
1448
1449 #[test]
1450 fn test_s3fifo_cache_resize() {
1451 let cache = s3fifo_cache_for_test();
1452 test_resize(&cache);
1453 }
1454
1455 #[test]
1456 fn test_lru_cache_resize() {
1457 let cache = lru_cache_for_test();
1458 test_resize(&cache);
1459 }
1460
1461 #[test]
1462 fn test_lfu_cache_resize() {
1463 let cache = lfu_cache_for_test();
1464 test_resize(&cache);
1465 }
1466
1467 #[test]
1468 fn test_sieve_cache_resize() {
1469 let cache = sieve_cache_for_test();
1470 test_resize(&cache);
1471 }
1472
1473 mod fuzzy {
1474 use foyer_common::properties::Hint;
1475
1476 use super::*;
1477
1478 fn fuzzy<E, S>(cache: RawCache<E, S>, hints: Vec<Hint>)
1479 where
1480 E: Eviction<Key = u64, Value = u64, Properties = TestProperties>,
1481 S: HashBuilder,
1482 {
1483 let handles = (0..8)
1484 .map(|i| {
1485 let c = cache.clone();
1486 let hints = hints.clone();
1487 std::thread::spawn(move || {
1488 let mut rng = SmallRng::seed_from_u64(i);
1489 for _ in 0..100000 {
1490 let key = rng.next_u64();
1491 if let Some(entry) = c.get(&key) {
1492 assert_eq!(key, *entry);
1493 drop(entry);
1494 continue;
1495 }
1496 let hint = hints.choose(&mut rng).cloned().unwrap();
1497 c.insert_with_properties(key, key, TestProperties::default().with_hint(hint));
1498 }
1499 })
1500 })
1501 .collect_vec();
1502
1503 handles.into_iter().for_each(|handle| handle.join().unwrap());
1504
1505 assert_eq!(cache.usage(), cache.capacity());
1506 }
1507
1508 #[test_log::test]
1509 fn test_fifo_cache_fuzzy() {
1510 let cache: RawCache<Fifo<u64, u64, TestProperties>, ModHasher> = RawCache::new(RawCacheConfig {
1511 capacity: 256,
1512 shards: 4,
1513 eviction_config: FifoConfig::default(),
1514 hash_builder: Default::default(),
1515 weighter: Arc::new(|_, _| 1),
1516 filter: Arc::new(|_, _| true),
1517 event_listener: None,
1518 metrics: Arc::new(Metrics::noop()),
1519 });
1520 let hints = vec![Hint::Normal];
1521 fuzzy(cache, hints);
1522 }
1523
1524 #[test_log::test]
1525 fn test_s3fifo_cache_fuzzy() {
1526 let cache: RawCache<S3Fifo<u64, u64, TestProperties>, ModHasher> = RawCache::new(RawCacheConfig {
1527 capacity: 256,
1528 shards: 4,
1529 eviction_config: S3FifoConfig::default(),
1530 hash_builder: Default::default(),
1531 weighter: Arc::new(|_, _| 1),
1532 filter: Arc::new(|_, _| true),
1533 event_listener: None,
1534 metrics: Arc::new(Metrics::noop()),
1535 });
1536 let hints = vec![Hint::Normal];
1537 fuzzy(cache, hints);
1538 }
1539
1540 #[test_log::test]
1541 fn test_lru_cache_fuzzy() {
1542 let cache: RawCache<Lru<u64, u64, TestProperties>, ModHasher> = RawCache::new(RawCacheConfig {
1543 capacity: 256,
1544 shards: 4,
1545 eviction_config: LruConfig::default(),
1546 hash_builder: Default::default(),
1547 weighter: Arc::new(|_, _| 1),
1548 filter: Arc::new(|_, _| true),
1549 event_listener: None,
1550 metrics: Arc::new(Metrics::noop()),
1551 });
1552 let hints = vec![Hint::Normal, Hint::Low];
1553 fuzzy(cache, hints);
1554 }
1555
1556 #[test_log::test]
1557 fn test_lfu_cache_fuzzy() {
1558 let cache: RawCache<Lfu<u64, u64, TestProperties>, ModHasher> = RawCache::new(RawCacheConfig {
1559 capacity: 256,
1560 shards: 4,
1561 eviction_config: LfuConfig::default(),
1562 hash_builder: Default::default(),
1563 weighter: Arc::new(|_, _| 1),
1564 filter: Arc::new(|_, _| true),
1565 event_listener: None,
1566 metrics: Arc::new(Metrics::noop()),
1567 });
1568 let hints = vec![Hint::Normal];
1569 fuzzy(cache, hints);
1570 }
1571
1572 #[test_log::test]
1573 fn test_sieve_cache_fuzzy() {
1574 let cache: RawCache<Sieve<u64, u64, TestProperties>, ModHasher> = RawCache::new(RawCacheConfig {
1575 capacity: 256,
1576 shards: 4,
1577 eviction_config: SieveConfig {},
1578 hash_builder: Default::default(),
1579 weighter: Arc::new(|_, _| 1),
1580 filter: Arc::new(|_, _| true),
1581 event_listener: None,
1582 metrics: Arc::new(Metrics::noop()),
1583 });
1584 let hints = vec![Hint::Normal];
1585 fuzzy(cache, hints);
1586 }
1587 }
1588}