1use std::{
16 any::Any,
17 fmt::Debug,
18 future::Future,
19 hash::Hash,
20 ops::Deref,
21 pin::Pin,
22 sync::{
23 atomic::{AtomicBool, Ordering},
24 Arc,
25 },
26 task::{Context, Poll},
27};
28
29use equivalent::Equivalent;
30use foyer_common::{
31 code::HashBuilder,
32 error::{Error, ErrorKind, Result},
33 event::{Event, EventListener},
34 metrics::Metrics,
35 properties::{Location, Properties, Source},
36 spawn::Spawner,
37 strict_assert,
38 utils::scope::Scope,
39};
40use futures_util::FutureExt as _;
41use itertools::Itertools;
42use parking_lot::{Mutex, RwLock};
43use pin_project::{pin_project, pinned_drop};
44
45use crate::{
46 eviction::{Eviction, Op},
47 indexer::{sentry::Sentry, Indexer},
48 inflight::{
49 Enqueue, FetchOrTake, FetchTarget, InflightManager, Notifier, OptionalFetch, OptionalFetchBuilder,
50 RequiredFetch, RequiredFetchBuilder, Waiter,
51 },
52 pipe::{ArcPipe, NoopPipe},
53 record::{Data, Record},
54 Piece,
55};
56
57pub trait Weighter<K, V>: Fn(&K, &V) -> usize + Send + Sync + 'static {}
61impl<K, V, T> Weighter<K, V> for T where T: Fn(&K, &V) -> usize + Send + Sync + 'static {}
62
63pub trait Filter<K, V>: Fn(&K, &V) -> bool + Send + Sync + 'static {}
74impl<K, V, T> Filter<K, V> for T where T: Fn(&K, &V) -> bool + Send + Sync + 'static {}
75
76pub struct RawCacheConfig<E, S>
77where
78 E: Eviction,
79 S: HashBuilder,
80{
81 pub capacity: usize,
82 pub shards: usize,
83 pub eviction_config: E::Config,
84 pub hash_builder: S,
85 pub weighter: Arc<dyn Weighter<E::Key, E::Value>>,
86 pub filter: Arc<dyn Filter<E::Key, E::Value>>,
87 pub event_listener: Option<Arc<dyn EventListener<Key = E::Key, Value = E::Value>>>,
88 pub metrics: Arc<Metrics>,
89}
90
91struct RawCacheShard<E, S, I>
92where
93 E: Eviction,
94 S: HashBuilder,
95 I: Indexer<Eviction = E>,
96{
97 eviction: E,
98 indexer: Sentry<I>,
99
100 usage: usize,
101 entries: usize,
102 capacity: usize,
103
104 inflights: Arc<Mutex<InflightManager<E, S, I>>>,
105
106 metrics: Arc<Metrics>,
107 _event_listener: Option<Arc<dyn EventListener<Key = E::Key, Value = E::Value>>>,
108}
109
110impl<E, S, I> RawCacheShard<E, S, I>
111where
112 E: Eviction,
113 S: HashBuilder,
114 I: Indexer<Eviction = E>,
115{
116 fn evict(&mut self, target: usize, garbages: &mut Vec<(Event, Arc<Record<E>>)>) {
118 while self.usage > target {
120 let evicted = match self.eviction.pop() {
121 Some(evicted) => evicted,
122 None => break,
123 };
124 self.metrics.memory_evict.increase(1);
125
126 let e = self.indexer.remove(evicted.hash(), evicted.key()).unwrap();
127 assert_eq!(Arc::as_ptr(&evicted), Arc::as_ptr(&e));
128
129 strict_assert!(!evicted.as_ref().is_in_indexer());
130 strict_assert!(!evicted.as_ref().is_in_eviction());
131
132 self.usage -= evicted.weight();
133 self.entries -= 1;
134 self.metrics.memory_entries.decrease(1);
135
136 garbages.push((Event::Evict, evicted));
137 }
138 }
139
140 #[expect(clippy::type_complexity)]
141 fn emplace(
142 &mut self,
143 record: Arc<Record<E>>,
144 garbages: &mut Vec<(Event, Arc<Record<E>>)>,
145 notifiers: &mut Vec<Notifier<Option<RawCacheEntry<E, S, I>>>>,
146 ) {
147 *notifiers = self
148 .inflights
149 .lock()
150 .take(record.hash(), record.key(), None)
151 .unwrap_or_default();
152
153 if record.properties().phantom().unwrap_or_default() {
154 if let Some(old) = self.indexer.remove(record.hash(), record.key()) {
155 strict_assert!(!old.is_in_indexer());
156
157 if old.is_in_eviction() {
158 self.eviction.remove(&old);
159 }
160 strict_assert!(!old.is_in_eviction());
161
162 self.usage -= old.weight();
163 self.entries -= 1;
164 self.metrics.memory_entries.decrease(1);
165
166 garbages.push((Event::Replace, old));
167 }
168 record.inc_refs(notifiers.len() + 1);
169 garbages.push((Event::Remove, record));
170 self.metrics.memory_insert.increase(1);
171 return;
172 }
173
174 let weight = record.weight();
175 let old_usage = self.usage;
176
177 self.evict(self.capacity.saturating_sub(weight), garbages);
179
180 if let Some(old) = self.indexer.insert(record.clone()) {
182 self.metrics.memory_replace.increase(1);
183
184 strict_assert!(!old.is_in_indexer());
185
186 if old.is_in_eviction() {
187 self.eviction.remove(&old);
188 }
189 strict_assert!(!old.is_in_eviction());
190
191 self.usage -= old.weight();
192
193 garbages.push((Event::Replace, old));
194 } else {
195 self.metrics.memory_insert.increase(1);
196 self.entries += 1;
197 self.metrics.memory_entries.increase(1);
198 }
199 strict_assert!(record.is_in_indexer());
200
201 strict_assert!(!record.is_in_eviction());
202 self.eviction.push(record.clone());
203 strict_assert!(record.is_in_eviction());
204
205 self.usage += weight;
206 record.inc_refs(notifiers.len() + 1);
209
210 match self.usage.cmp(&old_usage) {
211 std::cmp::Ordering::Greater => self.metrics.memory_usage.increase((self.usage - old_usage) as _),
212 std::cmp::Ordering::Less => self.metrics.memory_usage.decrease((old_usage - self.usage) as _),
213 std::cmp::Ordering::Equal => {}
214 }
215 }
216
217 #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::shard::remove"))]
218 fn remove<Q>(&mut self, hash: u64, key: &Q) -> Option<Arc<Record<E>>>
219 where
220 Q: Hash + Equivalent<E::Key> + ?Sized,
221 {
222 let record = self.indexer.remove(hash, key)?;
223
224 if record.is_in_eviction() {
225 self.eviction.remove(&record);
226 }
227 strict_assert!(!record.is_in_indexer());
228 strict_assert!(!record.is_in_eviction());
229
230 self.usage -= record.weight();
231 self.entries -= 1;
232
233 self.metrics.memory_remove.increase(1);
234 self.metrics.memory_usage.decrease(record.weight() as _);
235 self.metrics.memory_entries.decrease(1);
236
237 record.inc_refs(1);
238
239 Some(record)
240 }
241
242 #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::shard::get_noop"))]
243 fn get_noop<Q>(&self, hash: u64, key: &Q) -> Option<Arc<Record<E>>>
244 where
245 Q: Hash + Equivalent<E::Key> + ?Sized,
246 {
247 self.get_inner(hash, key)
248 }
249
250 #[cfg_attr(
251 feature = "tracing",
252 fastrace::trace(name = "foyer::memory::raw::shard::get_immutable")
253 )]
254 fn get_immutable<Q>(&self, hash: u64, key: &Q) -> Option<Arc<Record<E>>>
255 where
256 Q: Hash + Equivalent<E::Key> + ?Sized,
257 {
258 self.get_inner(hash, key)
259 .inspect(|record| self.acquire_immutable(record))
260 }
261
262 #[cfg_attr(
263 feature = "tracing",
264 fastrace::trace(name = "foyer::memory::raw::shard::get_mutable")
265 )]
266 fn get_mutable<Q>(&mut self, hash: u64, key: &Q) -> Option<Arc<Record<E>>>
267 where
268 Q: Hash + Equivalent<E::Key> + ?Sized,
269 {
270 self.get_inner(hash, key).inspect(|record| self.acquire_mutable(record))
271 }
272
273 #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::shard::get_inner"))]
274 fn get_inner<Q>(&self, hash: u64, key: &Q) -> Option<Arc<Record<E>>>
275 where
276 Q: Hash + Equivalent<E::Key> + ?Sized,
277 {
278 let record = match self.indexer.get(hash, key).cloned() {
279 Some(record) => {
280 self.metrics.memory_hit.increase(1);
281 record
282 }
283 None => {
284 self.metrics.memory_miss.increase(1);
285 return None;
286 }
287 };
288
289 strict_assert!(record.is_in_indexer());
290
291 record.inc_refs(1);
292
293 Some(record)
294 }
295
296 #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::shard::clear"))]
297 fn clear(&mut self, garbages: &mut Vec<Arc<Record<E>>>) {
298 let records = self.indexer.drain().collect_vec();
299 self.eviction.clear();
300
301 let mut count = 0;
302
303 for record in records {
304 count += 1;
305 strict_assert!(!record.is_in_indexer());
306 strict_assert!(!record.is_in_eviction());
307
308 garbages.push(record);
309 }
310
311 self.entries = 0;
312 if count > 0 {
313 self.metrics.memory_entries.decrease(count);
314 self.metrics.memory_remove.increase(count);
315 }
316 }
317
318 #[cfg_attr(
319 feature = "tracing",
320 fastrace::trace(name = "foyer::memory::raw::shard::acquire_immutable")
321 )]
322 fn acquire_immutable(&self, record: &Arc<Record<E>>) {
323 match E::acquire() {
324 Op::Immutable(f) => f(&self.eviction, record),
325 _ => unreachable!(),
326 }
327 }
328
329 #[cfg_attr(
330 feature = "tracing",
331 fastrace::trace(name = "foyer::memory::raw::shard::acquire_mutable")
332 )]
333 fn acquire_mutable(&mut self, record: &Arc<Record<E>>) {
334 match E::acquire() {
335 Op::Mutable(mut f) => f(&mut self.eviction, record),
336 _ => unreachable!(),
337 }
338 }
339
340 #[cfg_attr(
341 feature = "tracing",
342 fastrace::trace(name = "foyer::memory::raw::shard::release_immutable")
343 )]
344 fn release_immutable(&self, record: &Arc<Record<E>>) {
345 match E::release() {
346 Op::Immutable(f) => f(&self.eviction, record),
347 _ => unreachable!(),
348 }
349 }
350
351 #[cfg_attr(
352 feature = "tracing",
353 fastrace::trace(name = "foyer::memory::raw::shard::release_mutable")
354 )]
355 fn release_mutable(&mut self, record: &Arc<Record<E>>) {
356 match E::release() {
357 Op::Mutable(mut f) => f(&mut self.eviction, record),
358 _ => unreachable!(),
359 }
360 }
361}
362
363struct RawCacheInner<E, S, I>
364where
365 E: Eviction,
366 S: HashBuilder,
367 I: Indexer<Eviction = E>,
368{
369 shards: Vec<RwLock<RawCacheShard<E, S, I>>>,
370
371 capacity: usize,
372
373 hash_builder: Arc<S>,
374 weighter: Arc<dyn Weighter<E::Key, E::Value>>,
375 filter: Arc<dyn Filter<E::Key, E::Value>>,
376
377 metrics: Arc<Metrics>,
378 event_listener: Option<Arc<dyn EventListener<Key = E::Key, Value = E::Value>>>,
379}
380
381impl<E, S, I> RawCacheInner<E, S, I>
382where
383 E: Eviction,
384 S: HashBuilder,
385 I: Indexer<Eviction = E>,
386{
387 #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::inner::clear"))]
388 fn clear(&self) {
389 let mut garbages = vec![];
390
391 self.shards
392 .iter()
393 .map(|shard| shard.write())
394 .for_each(|mut shard| shard.clear(&mut garbages));
395
396 if let Some(listener) = self.event_listener.as_ref() {
398 for record in garbages {
399 listener.on_leave(Event::Clear, record.key(), record.value());
400 }
401 }
402 }
403}
404
405pub struct RawCache<E, S, I>
406where
407 E: Eviction,
408 S: HashBuilder,
409 I: Indexer<Eviction = E>,
410{
411 pipe: ArcPipe<E::Key, E::Value, E::Properties>,
412 inner: Arc<RawCacheInner<E, S, I>>,
413}
414
415impl<E, S, I> Clone for RawCache<E, S, I>
416where
417 E: Eviction,
418 S: HashBuilder,
419 I: Indexer<Eviction = E>,
420{
421 fn clone(&self) -> Self {
422 Self {
423 pipe: self.pipe.clone(),
424 inner: self.inner.clone(),
425 }
426 }
427}
428
429impl<E, S, I> Drop for RawCacheInner<E, S, I>
430where
431 E: Eviction,
432 S: HashBuilder,
433 I: Indexer<Eviction = E>,
434{
435 fn drop(&mut self) {
436 self.clear();
437 }
438}
439
440impl<E, S, I> RawCache<E, S, I>
441where
442 E: Eviction,
443 S: HashBuilder,
444 I: Indexer<Eviction = E>,
445{
446 pub fn new(config: RawCacheConfig<E, S>) -> Self {
447 assert!(config.shards > 0, "shards must be greater than zero.");
448
449 let shard_capacities = (0..config.shards)
450 .map(|index| Self::shard_capacity_for(config.capacity, config.shards, index))
451 .collect_vec();
452
453 let shards = shard_capacities
454 .into_iter()
455 .map(|shard_capacity| RawCacheShard {
456 eviction: E::new(shard_capacity, &config.eviction_config),
457 indexer: Sentry::default(),
458 usage: 0,
459 entries: 0,
460 capacity: shard_capacity,
461 inflights: Arc::new(Mutex::new(InflightManager::new())),
462 metrics: config.metrics.clone(),
463 _event_listener: config.event_listener.clone(),
464 })
465 .map(RwLock::new)
466 .collect_vec();
467
468 Self {
469 pipe: Arc::new(NoopPipe::default()),
470 inner: Arc::new(RawCacheInner {
471 shards,
472 capacity: config.capacity,
473 hash_builder: Arc::new(config.hash_builder),
474 weighter: config.weighter,
475 filter: config.filter,
476 metrics: config.metrics,
477 event_listener: config.event_listener,
478 }),
479 }
480 }
481
482 #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::resize"))]
483 pub fn resize(&self, capacity: usize) -> Result<()> {
484 let shards = self.inner.shards.len();
485 assert!(shards > 0, "shards must be greater than zero.");
486
487 let shard_capacities = (0..shards)
488 .map(|index| Self::shard_capacity_for(capacity, shards, index))
489 .collect_vec();
490
491 let handles = shard_capacities
492 .into_iter()
493 .enumerate()
494 .map(|(i, shard_capacity)| {
495 let pipe = self.pipe.clone();
496 let inner = self.inner.clone();
497 std::thread::spawn(move || {
498 let mut garbages = vec![];
499 let res = inner.shards[i].write().with(|mut shard| {
500 shard.eviction.update(shard_capacity, None).inspect(|_| {
501 shard.capacity = shard_capacity;
502 shard.evict(shard_capacity, &mut garbages)
503 })
504 });
505 let piped = pipe.is_enabled();
507 if inner.event_listener.is_some() || piped {
508 for (event, record) in garbages {
509 if let Some(listener) = inner.event_listener.as_ref() {
510 listener.on_leave(event, record.key(), record.value())
511 }
512 if piped && event == Event::Evict {
513 pipe.send(Piece::new(record));
514 }
515 }
516 }
517 res
518 })
519 })
520 .collect_vec();
521
522 let errs = handles
523 .into_iter()
524 .map(|handle| handle.join().unwrap())
525 .filter(|res| res.is_err())
526 .map(|res| res.unwrap_err())
527 .collect_vec();
528 if !errs.is_empty() {
529 let mut e = Error::new(ErrorKind::Config, "resize raw cache failed");
530 for err in errs {
531 e = e.with_context("reason", format!("{err}"));
532 }
533 return Err(e);
534 }
535
536 Ok(())
537 }
538
539 #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::insert"))]
540 pub fn insert(&self, key: E::Key, value: E::Value) -> RawCacheEntry<E, S, I> {
541 self.insert_with_properties(key, value, Default::default())
542 }
543
544 #[cfg_attr(
545 feature = "tracing",
546 fastrace::trace(name = "foyer::memory::raw::insert_with_properties")
547 )]
548 pub fn insert_with_properties(
549 &self,
550 key: E::Key,
551 value: E::Value,
552 properties: E::Properties,
553 ) -> RawCacheEntry<E, S, I> {
554 self.insert_with_properties_inner(key, value, properties, Source::Outer)
555 }
556
557 fn insert_with_properties_inner(
558 &self,
559 key: E::Key,
560 value: E::Value,
561 mut properties: E::Properties,
562 source: Source,
563 ) -> RawCacheEntry<E, S, I> {
564 let hash = self.inner.hash_builder.hash_one(&key);
565 let weight = (self.inner.weighter)(&key, &value);
566 if !(self.inner.filter)(&key, &value) {
567 properties = properties.with_phantom(true);
568 }
569 if let Some(location) = properties.location() {
570 if location == Location::OnDisk {
571 properties = properties.with_phantom(true);
572 }
573 }
574 let record = Arc::new(Record::new(Data {
575 key,
576 value,
577 properties,
578 hash,
579 weight,
580 }));
581 self.insert_inner(record, source)
582 }
583
584 #[doc(hidden)]
585 #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::insert_piece"))]
586 pub fn insert_piece(&self, piece: Piece<E::Key, E::Value, E::Properties>) -> RawCacheEntry<E, S, I> {
587 self.insert_inner(piece.into_record(), Source::Memory)
588 }
589
590 #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::insert_inner"))]
591 fn insert_inner(&self, record: Arc<Record<E>>, source: Source) -> RawCacheEntry<E, S, I> {
592 let mut garbages = vec![];
593 let mut notifiers = vec![];
594
595 self.inner.shards[self.shard(record.hash())]
596 .write()
597 .with(|mut shard| shard.emplace(record.clone(), &mut garbages, &mut notifiers));
598
599 for notifier in notifiers {
601 let _ = notifier.send(Ok(Some(RawCacheEntry {
602 pipe: self.pipe.clone(),
603 record: record.clone(),
604 inner: self.inner.clone(),
605 source,
606 })));
607 }
608
609 let piped = self.pipe.is_enabled();
611 if self.inner.event_listener.is_some() || piped {
612 for (event, record) in garbages {
613 if let Some(listener) = self.inner.event_listener.as_ref() {
614 listener.on_leave(event, record.key(), record.value())
615 }
616 if piped && event == Event::Evict {
617 self.pipe.send(Piece::new(record));
618 }
619 }
620 }
621
622 RawCacheEntry {
623 record,
624 pipe: self.pipe.clone(),
625 inner: self.inner.clone(),
626 source,
627 }
628 }
629
630 #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::evict_all"))]
632 pub fn evict_all(&self) {
633 let mut garbages = vec![];
634 for shard in self.inner.shards.iter() {
635 shard.write().evict(0, &mut garbages);
636 }
637
638 let piped = self.pipe.is_enabled();
640 if self.inner.event_listener.is_some() || piped {
641 for (event, record) in garbages {
642 if let Some(listener) = self.inner.event_listener.as_ref() {
643 listener.on_leave(event, record.key(), record.value())
644 }
645 if piped && event == Event::Evict {
646 self.pipe.send(Piece::new(record));
647 }
648 }
649 }
650 }
651
652 #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::flush"))]
657 pub async fn flush(&self) {
658 let mut garbages = vec![];
659 for shard in self.inner.shards.iter() {
660 shard.write().evict(0, &mut garbages);
661 }
662
663 let piped = self.pipe.is_enabled();
665
666 if let Some(listener) = self.inner.event_listener.as_ref() {
667 for (event, record) in garbages.iter() {
668 listener.on_leave(*event, record.key(), record.value());
669 }
670 }
671 if piped {
672 let pieces = garbages.into_iter().map(|(_, record)| Piece::new(record)).collect_vec();
673 self.pipe.flush(pieces).await;
674 }
675 }
676
677 #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::remove"))]
678 pub fn remove<Q>(&self, key: &Q) -> Option<RawCacheEntry<E, S, I>>
679 where
680 Q: Hash + Equivalent<E::Key> + ?Sized,
681 {
682 let hash = self.inner.hash_builder.hash_one(key);
683
684 self.inner.shards[self.shard(hash)]
685 .write()
686 .with(|mut shard| {
687 shard.remove(hash, key).map(|record| RawCacheEntry {
688 pipe: self.pipe.clone(),
689 inner: self.inner.clone(),
690 record,
691 source: Source::Memory,
692 })
693 })
694 .inspect(|record| {
695 if let Some(listener) = self.inner.event_listener.as_ref() {
697 listener.on_leave(Event::Remove, record.key(), record.value());
698 }
699 })
700 }
701
702 #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::get"))]
703 pub fn get<Q>(&self, key: &Q) -> Option<RawCacheEntry<E, S, I>>
704 where
705 Q: Hash + Equivalent<E::Key> + ?Sized,
706 {
707 let hash = self.inner.hash_builder.hash_one(key);
708
709 let record = match E::acquire() {
710 Op::Noop => self.inner.shards[self.shard(hash)].read().get_noop(hash, key),
711 Op::Immutable(_) => self.inner.shards[self.shard(hash)]
712 .read()
713 .with(|shard| shard.get_immutable(hash, key)),
714 Op::Mutable(_) => self.inner.shards[self.shard(hash)]
715 .write()
716 .with(|mut shard| shard.get_mutable(hash, key)),
717 }?;
718
719 Some(RawCacheEntry {
720 pipe: self.pipe.clone(),
721 inner: self.inner.clone(),
722 record,
723 source: Source::Memory,
724 })
725 }
726
727 #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::contains"))]
728 pub fn contains<Q>(&self, key: &Q) -> bool
729 where
730 Q: Hash + Equivalent<E::Key> + ?Sized,
731 {
732 let hash = self.inner.hash_builder.hash_one(key);
733
734 self.inner.shards[self.shard(hash)]
735 .read()
736 .with(|shard| shard.indexer.get(hash, key).is_some())
737 }
738
739 #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::touch"))]
740 pub fn touch<Q>(&self, key: &Q) -> bool
741 where
742 Q: Hash + Equivalent<E::Key> + ?Sized,
743 {
744 let hash = self.inner.hash_builder.hash_one(key);
745
746 match E::acquire() {
747 Op::Noop => self.inner.shards[self.shard(hash)].read().get_noop(hash, key),
748 Op::Immutable(_) => self.inner.shards[self.shard(hash)]
749 .read()
750 .with(|shard| shard.get_immutable(hash, key)),
751 Op::Mutable(_) => self.inner.shards[self.shard(hash)]
752 .write()
753 .with(|mut shard| shard.get_mutable(hash, key)),
754 }
755 .is_some()
756 }
757
758 #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::clear"))]
759 pub fn clear(&self) {
760 self.inner.clear();
761 }
762
763 pub fn capacity(&self) -> usize {
764 self.inner.capacity
765 }
766
767 pub fn usage(&self) -> usize {
768 self.inner.shards.iter().map(|shard| shard.read().usage).sum()
769 }
770
771 pub fn entries(&self) -> usize {
772 self.inner.shards.iter().map(|shard| shard.read().entries).sum()
773 }
774
775 pub fn metrics(&self) -> &Metrics {
776 &self.inner.metrics
777 }
778
779 pub fn hash_builder(&self) -> &Arc<S> {
780 &self.inner.hash_builder
781 }
782
783 pub fn shards(&self) -> usize {
784 self.inner.shards.len()
785 }
786
787 pub(crate) fn with_pipe(mut self, pipe: ArcPipe<E::Key, E::Value, E::Properties>) -> Self {
788 self.pipe = pipe;
789 self
790 }
791
792 fn shard(&self, hash: u64) -> usize {
793 hash as usize % self.inner.shards.len()
794 }
795
796 fn shard_capacity_for(total: usize, shards: usize, index: usize) -> usize {
797 let base = total / shards;
798 let remainder = total % shards;
799 base + usize::from(index < remainder)
800 }
801}
802
803pub struct RawCacheEntry<E, S, I>
804where
805 E: Eviction,
806 S: HashBuilder,
807 I: Indexer<Eviction = E>,
808{
809 pipe: ArcPipe<E::Key, E::Value, E::Properties>,
810 inner: Arc<RawCacheInner<E, S, I>>,
811 record: Arc<Record<E>>,
812 source: Source,
813}
814
815impl<E, S, I> Debug for RawCacheEntry<E, S, I>
816where
817 E: Eviction,
818 S: HashBuilder,
819 I: Indexer<Eviction = E>,
820{
821 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
822 f.debug_struct("RawCacheEntry").field("record", &self.record).finish()
823 }
824}
825
826impl<E, S, I> Drop for RawCacheEntry<E, S, I>
827where
828 E: Eviction,
829 S: HashBuilder,
830 I: Indexer<Eviction = E>,
831{
832 fn drop(&mut self) {
833 let hash = self.record.hash();
834 let shard = &self.inner.shards[hash as usize % self.inner.shards.len()];
835
836 if self.record.dec_refs(1) == 0 {
837 if self.record.properties().phantom().unwrap_or_default() {
838 if let Some(listener) = self.inner.event_listener.as_ref() {
839 listener.on_leave(Event::Evict, self.record.key(), self.record.value());
840 }
841 if self.pipe.is_enabled() {
842 self.pipe.send(Piece::new(self.record.clone()));
843 }
844 return;
845 }
846
847 match E::release() {
848 Op::Noop => {}
849 Op::Immutable(_) => shard.read().with(|shard| shard.release_immutable(&self.record)),
850 Op::Mutable(_) => shard.write().with(|mut shard| shard.release_mutable(&self.record)),
851 }
852 }
853 }
854}
855
856impl<E, S, I> Clone for RawCacheEntry<E, S, I>
857where
858 E: Eviction,
859 S: HashBuilder,
860 I: Indexer<Eviction = E>,
861{
862 fn clone(&self) -> Self {
863 self.record.inc_refs(1);
864 Self {
865 pipe: self.pipe.clone(),
866 inner: self.inner.clone(),
867 record: self.record.clone(),
868 source: self.source,
869 }
870 }
871}
872
873impl<E, S, I> Deref for RawCacheEntry<E, S, I>
874where
875 E: Eviction,
876 S: HashBuilder,
877 I: Indexer<Eviction = E>,
878{
879 type Target = E::Value;
880
881 fn deref(&self) -> &Self::Target {
882 self.value()
883 }
884}
885
886unsafe impl<E, S, I> Send for RawCacheEntry<E, S, I>
887where
888 E: Eviction,
889 S: HashBuilder,
890 I: Indexer<Eviction = E>,
891{
892}
893
894unsafe impl<E, S, I> Sync for RawCacheEntry<E, S, I>
895where
896 E: Eviction,
897 S: HashBuilder,
898 I: Indexer<Eviction = E>,
899{
900}
901
902impl<E, S, I> RawCacheEntry<E, S, I>
903where
904 E: Eviction,
905 S: HashBuilder,
906 I: Indexer<Eviction = E>,
907{
908 pub fn hash(&self) -> u64 {
909 self.record.hash()
910 }
911
912 pub fn key(&self) -> &E::Key {
913 self.record.key()
914 }
915
916 pub fn value(&self) -> &E::Value {
917 self.record.value()
918 }
919
920 pub fn properties(&self) -> &E::Properties {
921 self.record.properties()
922 }
923
924 pub fn weight(&self) -> usize {
925 self.record.weight()
926 }
927
928 pub fn refs(&self) -> usize {
929 self.record.refs()
930 }
931
932 pub fn is_outdated(&self) -> bool {
933 !self.record.is_in_indexer()
934 }
935
936 pub fn piece(&self) -> Piece<E::Key, E::Value, E::Properties> {
937 Piece::new(self.record.clone())
938 }
939
940 pub fn source(&self) -> Source {
941 self.source
942 }
943}
944
945impl<E, S, I> RawCache<E, S, I>
946where
947 E: Eviction,
948 S: HashBuilder,
949 I: Indexer<Eviction = E>,
950{
951 #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::get_or_fetch"))]
952 pub fn get_or_fetch<Q, F, FU, IT, ER>(&self, key: &Q, fetch: F) -> RawGetOrFetch<E, S, I>
953 where
954 Q: Hash + Equivalent<E::Key> + ?Sized + ToOwned<Owned = E::Key>,
955 F: FnOnce() -> FU,
956 FU: Future<Output = std::result::Result<IT, ER>> + Send + 'static,
957 IT: Into<FetchTarget<E::Key, E::Value, E::Properties>>,
958 ER: Into<anyhow::Error>,
959 {
960 let fut = fetch();
961 self.get_or_fetch_inner(
962 key,
963 || None,
964 || {
965 Some(Box::new(|_| {
966 async {
967 match fut.await {
968 Ok(it) => Ok(it.into()),
969 Err(e) => Err(Error::new(ErrorKind::External, "fetch failed").with_source(e)),
970 }
971 }
972 .boxed()
973 }))
974 },
975 (),
976 &Spawner::current(),
977 )
978 }
979
980 #[doc(hidden)]
984 #[cfg_attr(
985 feature = "tracing",
986 fastrace::trace(name = "foyer::memory::raw::get_or_fetch_inner")
987 )]
988 pub fn get_or_fetch_inner<Q, C, FO, FR>(
989 &self,
990 key: &Q,
991 fo: FO,
992 fr: FR,
993 ctx: C,
994 spawner: &Spawner,
995 ) -> RawGetOrFetch<E, S, I>
996 where
997 Q: Hash + Equivalent<E::Key> + ?Sized + ToOwned<Owned = E::Key>,
998 C: Any + Send + Sync + 'static,
999 FO: FnOnce() -> Option<OptionalFetchBuilder<E::Key, E::Value, E::Properties, C>>,
1000 FR: FnOnce() -> Option<RequiredFetchBuilder<E::Key, E::Value, E::Properties, C>>,
1001 {
1002 let hash = self.inner.hash_builder.hash_one(key);
1003
1004 let extract = |key: &Q, opt: Option<Arc<Record<E>>>, inflights: &Arc<Mutex<InflightManager<E, S, I>>>| {
1006 opt.map(|record| {
1007 RawGetOrFetch::Hit(Some(RawCacheEntry {
1008 pipe: self.pipe.clone(),
1009 inner: self.inner.clone(),
1010 record,
1011 source: Source::Memory,
1012 }))
1013 })
1014 .unwrap_or_else(|| match inflights.lock().enqueue(hash, key, fr()) {
1015 Enqueue::Lead {
1016 id,
1017 close,
1018 waiter,
1019 required_fetch_builder,
1020 } => {
1021 let fetch = RawFetch {
1022 state: RawFetchState::Init {
1023 optional_fetch_builder: fo(),
1024 required_fetch_builder,
1025 },
1026 id,
1027 hash,
1028 key: Some(key.to_owned()),
1029 ctx,
1030 cache: self.clone(),
1031 inflights: inflights.clone(),
1032 close,
1033 };
1034 spawner.spawn(fetch);
1035 RawGetOrFetch::Miss(RawWait { waiter })
1036 }
1037 Enqueue::Wait(waiter) => RawGetOrFetch::Miss(RawWait { waiter }),
1038 })
1039 };
1040
1041 let res = match E::acquire() {
1042 Op::Noop => self.inner.shards[self.shard(hash)]
1043 .read()
1044 .with(|shard| extract(key, shard.get_noop(hash, key), &shard.inflights)),
1045 Op::Immutable(_) => self.inner.shards[self.shard(hash)]
1046 .read()
1047 .with(|shard| extract(key, shard.get_immutable(hash, key), &shard.inflights)),
1048 Op::Mutable(_) => self.inner.shards[self.shard(hash)]
1049 .write()
1050 .with(|mut shard| extract(key, shard.get_mutable(hash, key), &shard.inflights)),
1051 };
1052
1053 res
1054 }
1055}
1056
1057#[must_use]
1058#[pin_project(project = RawGetOrFetchProj)]
1059pub enum RawGetOrFetch<E, S, I>
1060where
1061 E: Eviction,
1062 S: HashBuilder,
1063 I: Indexer<Eviction = E>,
1064{
1065 Hit(Option<RawCacheEntry<E, S, I>>),
1066 Miss(#[pin] RawWait<E, S, I>),
1067}
1068
1069impl<E, S, I> Debug for RawGetOrFetch<E, S, I>
1070where
1071 E: Eviction,
1072 S: HashBuilder,
1073 I: Indexer<Eviction = E>,
1074{
1075 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1076 match self {
1077 Self::Hit(e) => f.debug_tuple("Hit").field(e).finish(),
1078 Self::Miss(fut) => f.debug_tuple("Miss").field(fut).finish(),
1079 }
1080 }
1081}
1082
1083impl<E, S, I> Future for RawGetOrFetch<E, S, I>
1084where
1085 E: Eviction,
1086 S: HashBuilder,
1087 I: Indexer<Eviction = E>,
1088{
1089 type Output = Result<Option<RawCacheEntry<E, S, I>>>;
1090
1091 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1092 let this = self.project();
1093 match this {
1094 RawGetOrFetchProj::Hit(opt) => {
1095 assert!(opt.is_some(), "entry is already taken");
1096 Poll::Ready(Ok(opt.take()))
1097 }
1098 RawGetOrFetchProj::Miss(fut) => fut.poll(cx),
1099 }
1100 }
1101}
1102
1103impl<E, S, I> RawGetOrFetch<E, S, I>
1104where
1105 E: Eviction,
1106 S: HashBuilder,
1107 I: Indexer<Eviction = E>,
1108{
1109 pub fn need_await(&self) -> bool {
1110 matches!(self, Self::Miss(_))
1111 }
1112
1113 #[expect(clippy::allow_attributes)]
1114 #[allow(clippy::result_large_err)]
1115 pub fn try_unwrap(self) -> std::result::Result<Option<RawCacheEntry<E, S, I>>, Self> {
1116 match self {
1117 Self::Hit(opt) => {
1118 assert!(opt.is_some(), "entry is already taken");
1119 Ok(opt)
1120 }
1121 Self::Miss(_) => Err(self),
1122 }
1123 }
1124}
1125
1126type Once<T> = Option<T>;
1127
1128#[must_use]
1129enum Try<E, S, I, C>
1130where
1131 E: Eviction,
1132 S: HashBuilder,
1133 I: Indexer<Eviction = E>,
1134 C: Any + Send + 'static,
1135{
1136 Noop,
1137 SetStateAndContinue(RawFetchState<E, S, I, C>),
1138 Ready,
1139}
1140
1141macro_rules! handle_try {
1142 ($state:expr, $method:ident($($args:expr),* $(,)?)) => {
1143 handle_try! { $state, Self::$method($($args),*) }
1144 };
1145
1146 ($state:expr, $try:expr) => {
1147 match $try {
1148 Try::Noop => {}
1149 Try::SetStateAndContinue(state) => {
1150 $state = state;
1151 continue;
1152 },
1153 Try::Ready => {
1154 $state = RawFetchState::Ready;
1155 return Poll::Ready(())
1156 },
1157 }
1158 };
1159}
1160
1161#[expect(clippy::type_complexity)]
1162pub enum RawFetchState<E, S, I, C>
1163where
1164 E: Eviction,
1165 S: HashBuilder,
1166 I: Indexer<Eviction = E>,
1167{
1168 Init {
1169 optional_fetch_builder: Option<OptionalFetchBuilder<E::Key, E::Value, E::Properties, C>>,
1170 required_fetch_builder: Option<RequiredFetchBuilder<E::Key, E::Value, E::Properties, C>>,
1171 },
1172 FetchOptional {
1173 optional_fetch: OptionalFetch<FetchTarget<E::Key, E::Value, E::Properties>>,
1174 required_fetch_builder: Option<RequiredFetchBuilder<E::Key, E::Value, E::Properties, C>>,
1175 },
1176 FetchRequired {
1177 required_fetch: RequiredFetch<FetchTarget<E::Key, E::Value, E::Properties>>,
1178 },
1179 Notify {
1180 res: Option<Result<Option<RawCacheEntry<E, S, I>>>>,
1181 notifiers: Vec<Notifier<Option<RawCacheEntry<E, S, I>>>>,
1182 },
1183 Ready,
1184}
1185
1186impl<E, S, I, C> Debug for RawFetchState<E, S, I, C>
1187where
1188 E: Eviction,
1189 S: HashBuilder,
1190 I: Indexer<Eviction = E>,
1191{
1192 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1193 match self {
1194 Self::Init { .. } => f.debug_struct("Init").finish(),
1195 Self::FetchOptional { .. } => f.debug_struct("Optional").finish(),
1196 Self::FetchRequired { .. } => f.debug_struct("Required").finish(),
1197 Self::Notify { res, .. } => f.debug_struct("Notify").field("res", res).finish(),
1198 Self::Ready => f.debug_struct("Ready").finish(),
1199 }
1200 }
1201}
1202
1203#[pin_project]
1204pub struct RawWait<E, S, I>
1205where
1206 E: Eviction,
1207 S: HashBuilder,
1208 I: Indexer<Eviction = E>,
1209{
1210 waiter: Waiter<Option<RawCacheEntry<E, S, I>>>,
1211}
1212
1213impl<E, S, I> Debug for RawWait<E, S, I>
1214where
1215 E: Eviction,
1216 S: HashBuilder,
1217 I: Indexer<Eviction = E>,
1218{
1219 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1220 f.debug_struct("RawWait").field("waiter", &self.waiter).finish()
1221 }
1222}
1223
1224impl<E, S, I> Future for RawWait<E, S, I>
1225where
1226 E: Eviction,
1227 S: HashBuilder,
1228 I: Indexer<Eviction = E>,
1229{
1230 type Output = Result<Option<RawCacheEntry<E, S, I>>>;
1231
1232 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1233 let this = self.project();
1234 this.waiter.poll_unpin(cx).map(|r| match r {
1237 Ok(r) => r,
1238 Err(e) => Err(Error::new(ErrorKind::ChannelClosed, "waiter channel closed").with_source(e)),
1239 })
1240 }
1241}
1242
1243#[pin_project(PinnedDrop)]
1244pub struct RawFetch<E, S, I, C>
1245where
1246 E: Eviction,
1247 S: HashBuilder,
1248 I: Indexer<Eviction = E>,
1249{
1250 state: RawFetchState<E, S, I, C>,
1251 id: usize,
1252 hash: u64,
1253 key: Once<E::Key>,
1254 ctx: C,
1255 cache: RawCache<E, S, I>,
1256 inflights: Arc<Mutex<InflightManager<E, S, I>>>,
1257 close: Arc<AtomicBool>,
1258}
1259
1260impl<E, S, I, C> Debug for RawFetch<E, S, I, C>
1261where
1262 E: Eviction,
1263 S: HashBuilder,
1264 I: Indexer<Eviction = E>,
1265{
1266 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1267 f.debug_struct("RawFetch")
1268 .field("state", &self.state)
1269 .field("id", &self.id)
1270 .field("hash", &self.hash)
1271 .finish()
1272 }
1273}
1274
1275impl<E, S, I, C> Future for RawFetch<E, S, I, C>
1276where
1277 E: Eviction,
1278 S: HashBuilder,
1279 I: Indexer<Eviction = E>,
1280 C: Any + Send + 'static,
1281{
1282 type Output = ();
1283
1284 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1285 let this = self.as_mut().project();
1286 loop {
1287 match this.state {
1288 RawFetchState::Init {
1289 optional_fetch_builder,
1290 required_fetch_builder,
1291 } => {
1292 handle_try! { *this.state, try_set_optional(optional_fetch_builder, required_fetch_builder, this.ctx) }
1293 handle_try! { *this.state, try_set_required(required_fetch_builder, this.ctx, *this.id, *this.hash, this.key.as_ref().unwrap(), this.inflights, Ok(None)) }
1294 }
1295 RawFetchState::FetchOptional {
1296 optional_fetch,
1297 required_fetch_builder,
1298 } => {
1299 if this.close.load(Ordering::Relaxed) {
1300 return Poll::Ready(());
1301 }
1302 match optional_fetch.poll_unpin(cx) {
1303 Poll::Pending => return Poll::Pending,
1304 Poll::Ready(Ok(Some(target))) => {
1305 handle_try! {*this.state, handle_target(target, this.key, this.cache, Source::Disk) }
1306 }
1307 Poll::Ready(Ok(None)) => {
1308 handle_try! { *this.state, try_set_required(required_fetch_builder, this.ctx, *this.id, *this.hash, this.key.as_ref().unwrap(), &this.inflights, Ok(None)) }
1309 }
1310 Poll::Ready(Err(e)) => {
1311 handle_try! { *this.state, try_set_required(required_fetch_builder, this.ctx, *this.id, *this.hash, this.key.as_ref().unwrap(), &this.inflights, Err(e)) }
1312 }
1313 }
1314 }
1315 RawFetchState::FetchRequired { required_fetch } => {
1316 if this.close.load(Ordering::Relaxed) {
1317 return Poll::Ready(());
1318 }
1319 match required_fetch.poll_unpin(cx) {
1320 Poll::Pending => return Poll::Pending,
1321 Poll::Ready(Ok(target)) => {
1322 handle_try! { *this.state, handle_target(target, this.key, this.cache, Source::Outer) }
1323 }
1324 Poll::Ready(Err(e)) => {
1325 handle_try! { *this.state, handle_error(e, *this.id, *this.hash, this.key.as_ref().unwrap(), this.inflights) }
1326 }
1327 }
1328 }
1329 RawFetchState::Notify { res, notifiers } => {
1330 handle_try! { *this.state, handle_notify(res.take().unwrap(), notifiers) }
1331 }
1332 RawFetchState::Ready => return Poll::Ready(()),
1333 }
1334 }
1335 }
1336}
1337
1338impl<E, S, I, C> RawFetch<E, S, I, C>
1339where
1340 E: Eviction,
1341 S: HashBuilder,
1342 I: Indexer<Eviction = E>,
1343 C: Any + Send + 'static,
1344{
1345 #[expect(clippy::type_complexity)]
1346 fn try_set_optional(
1347 optional_fetch_builder: &mut Option<OptionalFetchBuilder<E::Key, E::Value, E::Properties, C>>,
1348 required_fetch_builder: &mut Option<RequiredFetchBuilder<E::Key, E::Value, E::Properties, C>>,
1349 ctx: &mut C,
1350 ) -> Try<E, S, I, C> {
1351 match optional_fetch_builder.take() {
1352 None => Try::Noop,
1353 Some(optional_fetch_builder) => {
1354 let optional_fetch = optional_fetch_builder(ctx);
1355 Try::SetStateAndContinue(RawFetchState::FetchOptional {
1356 optional_fetch,
1357 required_fetch_builder: required_fetch_builder.take(),
1358 })
1359 }
1360 }
1361 }
1362
1363 #[expect(clippy::type_complexity)]
1364 fn try_set_required(
1365 required_fetch_builder: &mut Option<RequiredFetchBuilder<E::Key, E::Value, E::Properties, C>>,
1366 ctx: &mut C,
1367 id: usize,
1368 hash: u64,
1369 key: &E::Key,
1370 inflights: &Arc<Mutex<InflightManager<E, S, I>>>,
1371 res_no_fetch: Result<Option<RawCacheEntry<E, S, I>>>,
1372 ) -> Try<E, S, I, C> {
1373 match required_fetch_builder.take() {
1375 None => {}
1376 Some(required_fetch_builder) => {
1377 let required_fetch = required_fetch_builder(ctx);
1378 return Try::SetStateAndContinue(RawFetchState::FetchRequired { required_fetch });
1379 }
1380 }
1381 let fetch_or_take = match inflights.lock().fetch_or_take(hash, key, id) {
1383 Some(fetch_or_take) => fetch_or_take,
1384 None => return Try::Ready,
1385 };
1386 match fetch_or_take {
1387 FetchOrTake::Fetch(required_fetch_builder) => {
1388 let required_fetch = required_fetch_builder(ctx);
1389 Try::SetStateAndContinue(RawFetchState::FetchRequired { required_fetch })
1390 }
1391 FetchOrTake::Notifiers(notifiers) => Try::SetStateAndContinue(RawFetchState::Notify {
1392 res: Some(res_no_fetch),
1393 notifiers,
1394 }),
1395 }
1396 }
1397
1398 fn handle_target(
1399 target: FetchTarget<E::Key, E::Value, E::Properties>,
1400 key: &mut Once<E::Key>,
1401 cache: &RawCache<E, S, I>,
1402 source: Source,
1403 ) -> Try<E, S, I, C> {
1404 match target {
1405 FetchTarget::Entry { value, properties } => {
1406 let key = key.take().unwrap();
1407 cache.insert_with_properties_inner(key, value, properties, source);
1408 }
1409 FetchTarget::Piece(piece) => {
1410 cache.insert_piece(piece);
1411 }
1412 }
1413 Try::Ready
1414 }
1415
1416 fn handle_error(
1417 e: Error,
1418 id: usize,
1419 hash: u64,
1420 key: &E::Key,
1421 inflights: &Arc<Mutex<InflightManager<E, S, I>>>,
1422 ) -> Try<E, S, I, C> {
1423 let notifiers = match inflights.lock().take(hash, key, Some(id)) {
1424 Some(notifiers) => notifiers,
1425 None => {
1426 return Try::Ready;
1427 }
1428 };
1429 Try::SetStateAndContinue(RawFetchState::Notify {
1430 res: Some(Err(e)),
1431 notifiers,
1432 })
1433 }
1434
1435 #[expect(clippy::type_complexity)]
1436 fn handle_notify(
1437 res: Result<Option<RawCacheEntry<E, S, I>>>,
1438 notifiers: &mut Vec<Notifier<Option<RawCacheEntry<E, S, I>>>>,
1439 ) -> Try<E, S, I, C> {
1440 match res {
1441 Ok(e) => {
1442 for notifier in notifiers.drain(..) {
1443 let _ = notifier.send(Ok(e.clone()));
1444 }
1445 }
1446 Err(e) => {
1447 for notifier in notifiers.drain(..) {
1448 let _ = notifier.send(Err(e.clone()));
1449 }
1450 }
1451 }
1452 Try::Ready
1453 }
1454}
1455
1456#[pinned_drop]
1457impl<E, S, I, C> PinnedDrop for RawFetch<E, S, I, C>
1458where
1459 E: Eviction,
1460 S: HashBuilder,
1461 I: Indexer<Eviction = E>,
1462{
1463 fn drop(self: Pin<&mut Self>) {
1464 let this = self.project();
1465 match this.state {
1466 RawFetchState::Notify { .. } | RawFetchState::Ready => return,
1467 RawFetchState::Init { .. } | RawFetchState::FetchOptional { .. } | RawFetchState::FetchRequired { .. } => {}
1468 }
1469 if let Some(notifiers) = this
1470 .inflights
1471 .lock()
1472 .take(*this.hash, this.key.as_ref().unwrap(), Some(*this.id))
1473 {
1474 for notifier in notifiers {
1475 let _ =
1476 notifier
1477 .send(Err(Error::new(ErrorKind::TaskCancelled, "fetch task cancelled")
1478 .with_context("hash", *this.hash)));
1479 }
1480 }
1481 }
1482}
1483
1484#[cfg(test)]
1485mod tests {
1486 use foyer_common::hasher::ModHasher;
1487 use rand::{rngs::SmallRng, seq::IndexedRandom, RngCore, SeedableRng};
1488
1489 use super::*;
1490 use crate::{
1491 eviction::{
1492 fifo::{Fifo, FifoConfig},
1493 lfu::{Lfu, LfuConfig},
1494 lru::{Lru, LruConfig},
1495 s3fifo::{S3Fifo, S3FifoConfig},
1496 sieve::{Sieve, SieveConfig},
1497 test_utils::TestProperties,
1498 },
1499 indexer::hash_table::HashTableIndexer,
1500 test_utils::PiecePipe,
1501 };
1502
1503 fn is_send_sync_static<T: Send + Sync + 'static>() {}
1504
1505 #[test]
1506 fn test_send_sync_static() {
1507 is_send_sync_static::<RawCache<Fifo<(), (), TestProperties>, ModHasher, HashTableIndexer<_>>>();
1508 is_send_sync_static::<RawCache<S3Fifo<(), (), TestProperties>, ModHasher, HashTableIndexer<_>>>();
1509 is_send_sync_static::<RawCache<Lfu<(), (), TestProperties>, ModHasher, HashTableIndexer<_>>>();
1510 is_send_sync_static::<RawCache<Lru<(), (), TestProperties>, ModHasher, HashTableIndexer<_>>>();
1511 is_send_sync_static::<RawCache<Sieve<(), (), TestProperties>, ModHasher, HashTableIndexer<_>>>();
1512 }
1513
1514 #[expect(clippy::type_complexity)]
1515 fn fifo_cache_for_test(
1516 ) -> RawCache<Fifo<u64, u64, TestProperties>, ModHasher, HashTableIndexer<Fifo<u64, u64, TestProperties>>> {
1517 RawCache::new(RawCacheConfig {
1518 capacity: 256,
1519 shards: 4,
1520 eviction_config: FifoConfig::default(),
1521 hash_builder: Default::default(),
1522 weighter: Arc::new(|_, _| 1),
1523 filter: Arc::new(|_, _| true),
1524 event_listener: None,
1525 metrics: Arc::new(Metrics::noop()),
1526 })
1527 }
1528
1529 #[expect(clippy::type_complexity)]
1530 fn s3fifo_cache_for_test(
1531 ) -> RawCache<S3Fifo<u64, u64, TestProperties>, ModHasher, HashTableIndexer<S3Fifo<u64, u64, TestProperties>>> {
1532 RawCache::new(RawCacheConfig {
1533 capacity: 256,
1534 shards: 4,
1535 eviction_config: S3FifoConfig::default(),
1536 hash_builder: Default::default(),
1537 weighter: Arc::new(|_, _| 1),
1538 filter: Arc::new(|_, _| true),
1539 event_listener: None,
1540 metrics: Arc::new(Metrics::noop()),
1541 })
1542 }
1543
1544 #[expect(clippy::type_complexity)]
1545 fn lru_cache_for_test(
1546 ) -> RawCache<Lru<u64, u64, TestProperties>, ModHasher, HashTableIndexer<Lru<u64, u64, TestProperties>>> {
1547 RawCache::new(RawCacheConfig {
1548 capacity: 256,
1549 shards: 4,
1550 eviction_config: LruConfig::default(),
1551 hash_builder: Default::default(),
1552 weighter: Arc::new(|_, _| 1),
1553 filter: Arc::new(|_, _| true),
1554 event_listener: None,
1555 metrics: Arc::new(Metrics::noop()),
1556 })
1557 }
1558
1559 #[expect(clippy::type_complexity)]
1560 fn lfu_cache_for_test(
1561 ) -> RawCache<Lfu<u64, u64, TestProperties>, ModHasher, HashTableIndexer<Lfu<u64, u64, TestProperties>>> {
1562 RawCache::new(RawCacheConfig {
1563 capacity: 256,
1564 shards: 4,
1565 eviction_config: LfuConfig::default(),
1566 hash_builder: Default::default(),
1567 weighter: Arc::new(|_, _| 1),
1568 filter: Arc::new(|_, _| true),
1569 event_listener: None,
1570 metrics: Arc::new(Metrics::noop()),
1571 })
1572 }
1573
1574 #[expect(clippy::type_complexity)]
1575 fn sieve_cache_for_test(
1576 ) -> RawCache<Sieve<u64, u64, TestProperties>, ModHasher, HashTableIndexer<Sieve<u64, u64, TestProperties>>> {
1577 RawCache::new(RawCacheConfig {
1578 capacity: 256,
1579 shards: 4,
1580 eviction_config: SieveConfig {},
1581 hash_builder: Default::default(),
1582 weighter: Arc::new(|_, _| 1),
1583 filter: Arc::new(|_, _| true),
1584 event_listener: None,
1585 metrics: Arc::new(Metrics::noop()),
1586 })
1587 }
1588
1589 #[test_log::test]
1590 fn test_insert_phantom() {
1591 let fifo = fifo_cache_for_test();
1592
1593 let e1 = fifo.insert_with_properties(1, 1, TestProperties::default().with_phantom(true));
1594 assert_eq!(fifo.usage(), 0);
1595 drop(e1);
1596 assert_eq!(fifo.usage(), 0);
1597
1598 let e2a = fifo.insert_with_properties(2, 2, TestProperties::default().with_phantom(true));
1599 assert_eq!(fifo.usage(), 0);
1600 assert!(fifo.get(&2).is_none());
1601 assert_eq!(fifo.usage(), 0);
1602 drop(e2a);
1603 assert_eq!(fifo.usage(), 0);
1604
1605 let fifo = fifo_cache_for_test();
1606 fifo.insert(1, 1);
1607 assert_eq!(fifo.usage(), 1);
1608 assert_eq!(fifo.get(&1).unwrap().value(), &1);
1609 let e2 = fifo.insert_with_properties(1, 100, TestProperties::default().with_phantom(true));
1610 assert_eq!(fifo.usage(), 0);
1611 drop(e2);
1612 assert_eq!(fifo.usage(), 0);
1613 assert!(fifo.get(&1).is_none());
1614 }
1615
1616 #[expect(clippy::type_complexity)]
1617 #[test_log::test]
1618 fn test_insert_filter() {
1619 let fifo: RawCache<
1620 Fifo<u64, u64, TestProperties>,
1621 ModHasher,
1622 HashTableIndexer<Fifo<u64, u64, TestProperties>>,
1623 > = RawCache::new(RawCacheConfig {
1624 capacity: 256,
1625 shards: 4,
1626 eviction_config: FifoConfig::default(),
1627 hash_builder: Default::default(),
1628 weighter: Arc::new(|_, _| 1),
1629 filter: Arc::new(|k, _| !matches!(*k, 42)),
1630 event_listener: None,
1631 metrics: Arc::new(Metrics::noop()),
1632 });
1633
1634 fifo.insert(1, 1);
1635 fifo.insert(2, 2);
1636 fifo.insert(42, 42);
1637 assert_eq!(fifo.usage(), 2);
1638 assert_eq!(fifo.get(&1).unwrap().value(), &1);
1639 assert_eq!(fifo.get(&2).unwrap().value(), &2);
1640 assert!(fifo.get(&42).is_none());
1641 }
1642
1643 #[test]
1644 fn test_evict_all() {
1645 let pipe = Arc::new(PiecePipe::default());
1646
1647 let fifo = fifo_cache_for_test().with_pipe(pipe.clone());
1648 for i in 0..fifo.capacity() as _ {
1649 fifo.insert(i, i);
1650 }
1651 assert_eq!(fifo.usage(), fifo.capacity());
1652
1653 fifo.evict_all();
1654 let mut pieces = pipe
1655 .pieces()
1656 .iter()
1657 .map(|p| (p.hash(), *p.key(), *p.value()))
1658 .collect_vec();
1659 pieces.sort_by_key(|t| t.0);
1660 let expected = (0..fifo.capacity() as u64).map(|i| (i, i, i)).collect_vec();
1661 assert_eq!(pieces, expected);
1662 }
1663
1664 #[test]
1665 fn test_insert_size_over_capacity() {
1666 let cache: RawCache<Fifo<Vec<u8>, Vec<u8>, TestProperties>, ModHasher, HashTableIndexer<_>> =
1667 RawCache::new(RawCacheConfig {
1668 capacity: 4 * 1024, shards: 1,
1670 eviction_config: FifoConfig::default(),
1671 hash_builder: Default::default(),
1672 weighter: Arc::new(|k, v| k.len() + v.len()),
1673 filter: Arc::new(|_, _| true),
1674 event_listener: None,
1675 metrics: Arc::new(Metrics::noop()),
1676 });
1677
1678 let key = vec![b'k'; 1024]; let value = vec![b'v'; 5 * 1024]; cache.insert(key.clone(), value.clone());
1682 assert_eq!(cache.usage(), 6 * 1024);
1683 assert_eq!(cache.get(&key).unwrap().value(), &value);
1684 }
1685
1686 #[test]
1687 fn test_capacity_distribution_without_loss() {
1688 let cache: RawCache<Fifo<u64, u64, TestProperties>, ModHasher, HashTableIndexer<_>> =
1689 RawCache::new(RawCacheConfig {
1690 capacity: 3,
1691 shards: 2,
1692 eviction_config: FifoConfig::default(),
1693 hash_builder: Default::default(),
1694 weighter: Arc::new(|_, _| 1),
1695 filter: Arc::new(|_, _| true),
1696 event_listener: None,
1697 metrics: Arc::new(Metrics::noop()),
1698 });
1699
1700 for key in 0..3 {
1701 let entry = cache.insert(key, key);
1702 drop(entry);
1703 }
1704
1705 assert_eq!(cache.usage(), 3);
1706
1707 for key in 0..3 {
1708 let entry = cache.get(&key).expect("entry should exist");
1709 assert_eq!(*entry, key);
1710 drop(entry);
1711 }
1712 }
1713
1714 #[test]
1715 fn test_capacity_distribution_with_more_shards_than_capacity() {
1716 let cache: RawCache<Fifo<u64, u64, TestProperties>, ModHasher, HashTableIndexer<_>> =
1717 RawCache::new(RawCacheConfig {
1718 capacity: 2,
1719 shards: 4,
1720 eviction_config: FifoConfig::default(),
1721 hash_builder: Default::default(),
1722 weighter: Arc::new(|_, _| 1),
1723 filter: Arc::new(|_, _| true),
1724 event_listener: None,
1725 metrics: Arc::new(Metrics::noop()),
1726 });
1727
1728 for key in 0..2 {
1729 let entry = cache.insert(key, key);
1730 drop(entry);
1731 }
1732
1733 assert_eq!(cache.usage(), 2);
1734
1735 for key in 0..2 {
1736 let entry = cache.get(&key).expect("entry should exist");
1737 assert_eq!(*entry, key);
1738 drop(entry);
1739 }
1740
1741 assert!(cache.get(&2).is_none());
1742 }
1743
1744 fn test_resize<E>(cache: &RawCache<E, ModHasher, HashTableIndexer<E>>)
1745 where
1746 E: Eviction<Key = u64, Value = u64>,
1747 {
1748 let capacity = cache.capacity();
1749 for i in 0..capacity as u64 * 2 {
1750 cache.insert(i, i);
1751 }
1752 assert_eq!(cache.usage(), capacity);
1753 cache.resize(capacity / 2).unwrap();
1754 assert_eq!(cache.usage(), capacity / 2);
1755 for i in 0..capacity as u64 * 2 {
1756 cache.insert(i, i);
1757 }
1758 assert_eq!(cache.usage(), capacity / 2);
1759 }
1760
1761 #[test]
1762 fn test_fifo_cache_resize() {
1763 let cache = fifo_cache_for_test();
1764 test_resize(&cache);
1765 }
1766
1767 #[test]
1768 fn test_s3fifo_cache_resize() {
1769 let cache = s3fifo_cache_for_test();
1770 test_resize(&cache);
1771 }
1772
1773 #[test]
1774 fn test_lru_cache_resize() {
1775 let cache = lru_cache_for_test();
1776 test_resize(&cache);
1777 }
1778
1779 #[test]
1780 fn test_lfu_cache_resize() {
1781 let cache = lfu_cache_for_test();
1782 test_resize(&cache);
1783 }
1784
1785 #[test]
1786 fn test_sieve_cache_resize() {
1787 let cache = sieve_cache_for_test();
1788 test_resize(&cache);
1789 }
1790
1791 mod fuzzy {
1792 use foyer_common::properties::Hint;
1793
1794 use super::*;
1795
1796 fn fuzzy<E, S>(cache: RawCache<E, S, HashTableIndexer<E>>, hints: Vec<Hint>)
1797 where
1798 E: Eviction<Key = u64, Value = u64, Properties = TestProperties>,
1799 S: HashBuilder,
1800 {
1801 let handles = (0..8)
1802 .map(|i| {
1803 let c = cache.clone();
1804 let hints = hints.clone();
1805 std::thread::spawn(move || {
1806 let mut rng = SmallRng::seed_from_u64(i);
1807 for _ in 0..100000 {
1808 let key = rng.next_u64();
1809 if let Some(entry) = c.get(&key) {
1810 assert_eq!(key, *entry);
1811 drop(entry);
1812 continue;
1813 }
1814 let hint = hints.choose(&mut rng).cloned().unwrap();
1815 c.insert_with_properties(key, key, TestProperties::default().with_hint(hint));
1816 }
1817 })
1818 })
1819 .collect_vec();
1820
1821 handles.into_iter().for_each(|handle| handle.join().unwrap());
1822
1823 assert_eq!(cache.usage(), cache.capacity());
1824 }
1825
1826 #[test_log::test]
1827 fn test_fifo_cache_fuzzy() {
1828 let cache: RawCache<Fifo<u64, u64, TestProperties>, ModHasher, HashTableIndexer<_>> =
1829 RawCache::new(RawCacheConfig {
1830 capacity: 256,
1831 shards: 4,
1832 eviction_config: FifoConfig::default(),
1833 hash_builder: Default::default(),
1834 weighter: Arc::new(|_, _| 1),
1835 filter: Arc::new(|_, _| true),
1836 event_listener: None,
1837 metrics: Arc::new(Metrics::noop()),
1838 });
1839 let hints = vec![Hint::Normal];
1840 fuzzy(cache, hints);
1841 }
1842
1843 #[test_log::test]
1844 fn test_s3fifo_cache_fuzzy() {
1845 let cache: RawCache<S3Fifo<u64, u64, TestProperties>, ModHasher, HashTableIndexer<_>> =
1846 RawCache::new(RawCacheConfig {
1847 capacity: 256,
1848 shards: 4,
1849 eviction_config: S3FifoConfig::default(),
1850 hash_builder: Default::default(),
1851 weighter: Arc::new(|_, _| 1),
1852 filter: Arc::new(|_, _| true),
1853 event_listener: None,
1854 metrics: Arc::new(Metrics::noop()),
1855 });
1856 let hints = vec![Hint::Normal];
1857 fuzzy(cache, hints);
1858 }
1859
1860 #[test_log::test]
1861 fn test_lru_cache_fuzzy() {
1862 let cache: RawCache<Lru<u64, u64, TestProperties>, ModHasher, HashTableIndexer<_>> =
1863 RawCache::new(RawCacheConfig {
1864 capacity: 256,
1865 shards: 4,
1866 eviction_config: LruConfig::default(),
1867 hash_builder: Default::default(),
1868 weighter: Arc::new(|_, _| 1),
1869 filter: Arc::new(|_, _| true),
1870 event_listener: None,
1871 metrics: Arc::new(Metrics::noop()),
1872 });
1873 let hints = vec![Hint::Normal, Hint::Low];
1874 fuzzy(cache, hints);
1875 }
1876
1877 #[test_log::test]
1878 fn test_lfu_cache_fuzzy() {
1879 let cache: RawCache<Lfu<u64, u64, TestProperties>, ModHasher, HashTableIndexer<_>> =
1880 RawCache::new(RawCacheConfig {
1881 capacity: 256,
1882 shards: 4,
1883 eviction_config: LfuConfig::default(),
1884 hash_builder: Default::default(),
1885 weighter: Arc::new(|_, _| 1),
1886 filter: Arc::new(|_, _| true),
1887 event_listener: None,
1888 metrics: Arc::new(Metrics::noop()),
1889 });
1890 let hints = vec![Hint::Normal];
1891 fuzzy(cache, hints);
1892 }
1893
1894 #[test_log::test]
1895 fn test_sieve_cache_fuzzy() {
1896 let cache: RawCache<Sieve<u64, u64, TestProperties>, ModHasher, HashTableIndexer<_>> =
1897 RawCache::new(RawCacheConfig {
1898 capacity: 256,
1899 shards: 4,
1900 eviction_config: SieveConfig {},
1901 hash_builder: Default::default(),
1902 weighter: Arc::new(|_, _| 1),
1903 filter: Arc::new(|_, _| true),
1904 event_listener: None,
1905 metrics: Arc::new(Metrics::noop()),
1906 });
1907 let hints = vec![Hint::Normal];
1908 fuzzy(cache, hints);
1909 }
1910 }
1911}