1use std::{
16 collections::hash_map::{Entry as HashMapEntry, HashMap},
17 fmt::Debug,
18 future::Future,
19 hash::Hash,
20 ops::Deref,
21 pin::Pin,
22 sync::Arc,
23 task::{Context, Poll},
24};
25
26use arc_swap::ArcSwap;
27use equivalent::Equivalent;
28use fastrace::{
29 future::{FutureExt, InSpan},
30 Span,
31};
32use foyer_common::{
33 code::HashBuilder,
34 event::{Event, EventListener},
35 future::{Diversion, DiversionFuture},
36 metrics::Metrics,
37 runtime::SingletonHandle,
38 scope::Scope,
39 strict_assert,
40};
41use itertools::Itertools;
42use parking_lot::{Mutex, RwLock};
43use pin_project::pin_project;
44use tokio::{sync::oneshot, task::JoinHandle};
45
46use crate::{
47 error::{Error, Result},
48 eviction::{Eviction, Op},
49 indexer::{hash_table::HashTableIndexer, sentry::Sentry, Indexer},
50 pipe::NoopPipe,
51 record::{Data, Record},
52 Piece, Pipe,
53};
54
55pub trait Weighter<K, V>: Fn(&K, &V) -> usize + Send + Sync + 'static {}
59impl<K, V, T> Weighter<K, V> for T where T: Fn(&K, &V) -> usize + Send + Sync + 'static {}
60
61pub struct RawCacheConfig<E, S>
62where
63 E: Eviction,
64 S: HashBuilder,
65{
66 pub capacity: usize,
67 pub shards: usize,
68 pub eviction_config: E::Config,
69 pub hash_builder: S,
70 pub weighter: Arc<dyn Weighter<E::Key, E::Value>>,
71 pub event_listener: Option<Arc<dyn EventListener<Key = E::Key, Value = E::Value>>>,
72 pub metrics: Arc<Metrics>,
73}
74
75struct RawCacheShard<E, S, I>
76where
77 E: Eviction,
78 S: HashBuilder,
79 I: Indexer<Eviction = E>,
80{
81 eviction: E,
82 indexer: Sentry<I>,
83
84 usage: usize,
85 capacity: usize,
86
87 #[expect(clippy::type_complexity)]
88 waiters: Mutex<HashMap<E::Key, Vec<oneshot::Sender<RawCacheEntry<E, S, I>>>>>,
89
90 metrics: Arc<Metrics>,
91 _event_listener: Option<Arc<dyn EventListener<Key = E::Key, Value = E::Value>>>,
92}
93
94impl<E, S, I> RawCacheShard<E, S, I>
95where
96 E: Eviction,
97 S: HashBuilder,
98 I: Indexer<Eviction = E>,
99{
100 fn evict(&mut self, target: usize, garbages: &mut Vec<(Event, Arc<Record<E>>)>) {
102 while self.usage > target {
104 let evicted = match self.eviction.pop() {
105 Some(evicted) => evicted,
106 None => break,
107 };
108 self.metrics.memory_evict.increase(1);
109
110 let e = self.indexer.remove(evicted.hash(), evicted.key()).unwrap();
111 assert_eq!(Arc::as_ptr(&evicted), Arc::as_ptr(&e));
112
113 strict_assert!(!evicted.as_ref().is_in_indexer());
114 strict_assert!(!evicted.as_ref().is_in_eviction());
115
116 self.usage -= evicted.weight();
117
118 garbages.push((Event::Evict, evicted));
119 }
120 }
121
122 fn emplace(
123 &mut self,
124 data: Data<E>,
125 ephemeral: bool,
126 garbages: &mut Vec<(Event, Arc<Record<E>>)>,
127 waiters: &mut Vec<oneshot::Sender<RawCacheEntry<E, S, I>>>,
128 ) -> Arc<Record<E>> {
129 std::mem::swap(waiters, &mut self.waiters.lock().remove(&data.key).unwrap_or_default());
130
131 let weight = data.weight;
132 let old_usage = self.usage;
133
134 let record = Arc::new(Record::new(data));
135
136 self.evict(self.capacity.saturating_sub(weight), garbages);
138
139 if let Some(old) = self.indexer.insert(record.clone()) {
141 self.metrics.memory_replace.increase(1);
142
143 strict_assert!(!old.is_in_indexer());
144
145 if old.is_in_eviction() {
146 self.eviction.remove(&old);
147 }
148 strict_assert!(!old.is_in_eviction());
149
150 self.usage -= old.weight();
151
152 garbages.push((Event::Replace, old));
153 } else {
154 self.metrics.memory_insert.increase(1);
155 }
156 strict_assert!(record.is_in_indexer());
157
158 record.set_ephemeral(ephemeral);
159 if !ephemeral {
160 self.eviction.push(record.clone());
161 strict_assert!(record.is_in_eviction());
162 }
163
164 self.usage += weight;
165 let refs = waiters.len() + 1;
168 let inc = record.inc_refs(refs);
169 assert_eq!(refs, inc);
170
171 match self.usage.cmp(&old_usage) {
172 std::cmp::Ordering::Greater => self.metrics.memory_usage.increase((self.usage - old_usage) as _),
173 std::cmp::Ordering::Less => self.metrics.memory_usage.decrease((old_usage - self.usage) as _),
174 std::cmp::Ordering::Equal => {}
175 }
176
177 record
178 }
179
180 #[fastrace::trace(name = "foyer::memory::raw::shard::remove")]
181 fn remove<Q>(&mut self, hash: u64, key: &Q) -> Option<Arc<Record<E>>>
182 where
183 Q: Hash + Equivalent<E::Key> + ?Sized,
184 {
185 let record = self.indexer.remove(hash, key)?;
186
187 if record.is_in_eviction() {
188 self.eviction.remove(&record);
189 }
190 strict_assert!(!record.is_in_indexer());
191 strict_assert!(!record.is_in_eviction());
192
193 self.usage -= record.weight();
194
195 self.metrics.memory_remove.increase(1);
196 self.metrics.memory_usage.decrease(record.weight() as _);
197
198 record.inc_refs(1);
199
200 Some(record)
201 }
202
203 #[fastrace::trace(name = "foyer::memory::raw::shard::get_noop")]
204 fn get_noop<Q>(&self, hash: u64, key: &Q) -> Option<Arc<Record<E>>>
205 where
206 Q: Hash + Equivalent<E::Key> + ?Sized,
207 {
208 self.get_inner(hash, key)
209 }
210
211 #[fastrace::trace(name = "foyer::memory::raw::shard::get_immutable")]
212 fn get_immutable<Q>(&self, hash: u64, key: &Q) -> Option<Arc<Record<E>>>
213 where
214 Q: Hash + Equivalent<E::Key> + ?Sized,
215 {
216 self.get_inner(hash, key)
217 .inspect(|record| self.acquire_immutable(record))
218 }
219
220 #[fastrace::trace(name = "foyer::memory::raw::shard::get_mutable")]
221 fn get_mutable<Q>(&mut self, hash: u64, key: &Q) -> Option<Arc<Record<E>>>
222 where
223 Q: Hash + Equivalent<E::Key> + ?Sized,
224 {
225 self.get_inner(hash, key).inspect(|record| self.acquire_mutable(record))
226 }
227
228 #[fastrace::trace(name = "foyer::memory::raw::shard::get_inner")]
229 fn get_inner<Q>(&self, hash: u64, key: &Q) -> Option<Arc<Record<E>>>
230 where
231 Q: Hash + Equivalent<E::Key> + ?Sized,
232 {
233 let record = match self.indexer.get(hash, key).cloned() {
234 Some(record) => {
235 self.metrics.memory_hit.increase(1);
236 record
237 }
238 None => {
239 self.metrics.memory_miss.increase(1);
240 return None;
241 }
242 };
243
244 strict_assert!(record.is_in_indexer());
245
246 record.set_ephemeral(false);
247
248 record.inc_refs(1);
249
250 Some(record)
251 }
252
253 #[fastrace::trace(name = "foyer::memory::raw::shard::clear")]
254 fn clear(&mut self, garbages: &mut Vec<Arc<Record<E>>>) {
255 let records = self.indexer.drain().collect_vec();
256 self.eviction.clear();
257
258 let mut count = 0;
259
260 for record in records {
261 count += 1;
262 strict_assert!(!record.is_in_indexer());
263 strict_assert!(!record.is_in_eviction());
264
265 garbages.push(record);
266 }
267
268 self.metrics.memory_remove.increase(count);
269 }
270
271 #[fastrace::trace(name = "foyer::memory::raw::shard::acquire_immutable")]
272 fn acquire_immutable(&self, record: &Arc<Record<E>>) {
273 match E::acquire() {
274 Op::Immutable(f) => f(&self.eviction, record),
275 _ => unreachable!(),
276 }
277 }
278
279 #[fastrace::trace(name = "foyer::memory::raw::shard::acquire_mutable")]
280 fn acquire_mutable(&mut self, record: &Arc<Record<E>>) {
281 match E::acquire() {
282 Op::Mutable(mut f) => f(&mut self.eviction, record),
283 _ => unreachable!(),
284 }
285 }
286
287 #[fastrace::trace(name = "foyer::memory::raw::shard::release_immutable")]
288 fn release_immutable(&self, record: &Arc<Record<E>>) {
289 match E::release() {
290 Op::Immutable(f) => f(&self.eviction, record),
291 _ => unreachable!(),
292 }
293 }
294
295 #[fastrace::trace(name = "foyer::memory::raw::shard::release_mutable")]
296 fn release_mutable(&mut self, record: &Arc<Record<E>>) {
297 match E::release() {
298 Op::Mutable(mut f) => f(&mut self.eviction, record),
299 _ => unreachable!(),
300 }
301 }
302
303 #[fastrace::trace(name = "foyer::memory::raw::shard::fetch_noop")]
304 fn fetch_noop(&self, hash: u64, key: &E::Key) -> RawShardFetch<E, S, I>
305 where
306 E::Key: Clone,
307 {
308 if let Some(record) = self.get_noop(hash, key) {
309 return RawShardFetch::Hit(record);
310 }
311
312 self.fetch_queue(key.clone())
313 }
314
315 #[fastrace::trace(name = "foyer::memory::raw::shard::fetch_immutable")]
316 fn fetch_immutable(&self, hash: u64, key: &E::Key) -> RawShardFetch<E, S, I>
317 where
318 E::Key: Clone,
319 {
320 if let Some(record) = self.get_immutable(hash, key) {
321 return RawShardFetch::Hit(record);
322 }
323
324 self.fetch_queue(key.clone())
325 }
326
327 #[fastrace::trace(name = "foyer::memory::raw::shard::fetch_mutable")]
328 fn fetch_mutable(&mut self, hash: u64, key: &E::Key) -> RawShardFetch<E, S, I>
329 where
330 E::Key: Clone,
331 {
332 if let Some(record) = self.get_mutable(hash, key) {
333 return RawShardFetch::Hit(record);
334 }
335
336 self.fetch_queue(key.clone())
337 }
338
339 #[fastrace::trace(name = "foyer::memory::raw::shard::fetch_queue")]
340 fn fetch_queue(&self, key: E::Key) -> RawShardFetch<E, S, I> {
341 match self.waiters.lock().entry(key) {
342 HashMapEntry::Occupied(mut o) => {
343 let (tx, rx) = oneshot::channel();
344 o.get_mut().push(tx);
345 self.metrics.memory_queue.increase(1);
346 RawShardFetch::Wait(rx.in_span(Span::enter_with_local_parent(
347 "foyer::memory::raw::fetch_with_runtime::wait",
348 )))
349 }
350 HashMapEntry::Vacant(v) => {
351 v.insert(vec![]);
352 self.metrics.memory_fetch.increase(1);
353 RawShardFetch::Miss
354 }
355 }
356 }
357}
358
359struct RawCacheInner<E, S, I>
360where
361 E: Eviction,
362 S: HashBuilder,
363 I: Indexer<Eviction = E>,
364{
365 shards: Vec<RwLock<RawCacheShard<E, S, I>>>,
366
367 capacity: usize,
368
369 hash_builder: Arc<S>,
370 weighter: Arc<dyn Weighter<E::Key, E::Value>>,
371
372 metrics: Arc<Metrics>,
373 event_listener: Option<Arc<dyn EventListener<Key = E::Key, Value = E::Value>>>,
374 pipe: ArcSwap<Box<dyn Pipe<Key = E::Key, Value = E::Value>>>,
375}
376
377impl<E, S, I> RawCacheInner<E, S, I>
378where
379 E: Eviction,
380 S: HashBuilder,
381 I: Indexer<Eviction = E>,
382{
383 #[fastrace::trace(name = "foyer::memory::raw::inner::clear")]
384 fn clear(&self) {
385 let mut garbages = vec![];
386
387 self.shards
388 .iter()
389 .map(|shard| shard.write())
390 .for_each(|mut shard| shard.clear(&mut garbages));
391
392 if let Some(listener) = self.event_listener.as_ref() {
394 for record in garbages {
395 listener.on_leave(Event::Clear, record.key(), record.value());
396 }
397 }
398 }
399}
400
401pub struct RawCache<E, S = ahash::RandomState, I = HashTableIndexer<E>>
402where
403 E: Eviction,
404 S: HashBuilder,
405 I: Indexer<Eviction = E>,
406{
407 inner: Arc<RawCacheInner<E, S, I>>,
408}
409
410impl<E, S, I> Drop for RawCacheInner<E, S, I>
411where
412 E: Eviction,
413 S: HashBuilder,
414 I: Indexer<Eviction = E>,
415{
416 fn drop(&mut self) {
417 self.clear();
418 }
419}
420
421impl<E, S, I> Clone for RawCache<E, S, I>
422where
423 E: Eviction,
424 S: HashBuilder,
425 I: Indexer<Eviction = E>,
426{
427 fn clone(&self) -> Self {
428 Self {
429 inner: self.inner.clone(),
430 }
431 }
432}
433
434impl<E, S, I> RawCache<E, S, I>
435where
436 E: Eviction,
437 S: HashBuilder,
438 I: Indexer<Eviction = E>,
439{
440 pub fn new(config: RawCacheConfig<E, S>) -> Self {
441 let shard_capacity = config.capacity / config.shards;
442
443 let shards = (0..config.shards)
444 .map(|_| RawCacheShard {
445 eviction: E::new(shard_capacity, &config.eviction_config),
446 indexer: Sentry::default(),
447 usage: 0,
448 capacity: shard_capacity,
449 waiters: Mutex::default(),
450 metrics: config.metrics.clone(),
451 _event_listener: config.event_listener.clone(),
452 })
453 .map(RwLock::new)
454 .collect_vec();
455
456 let pipe: Box<dyn Pipe<Key = E::Key, Value = E::Value>> = Box::new(NoopPipe::default());
457
458 let inner = RawCacheInner {
459 shards,
460 capacity: config.capacity,
461 hash_builder: Arc::new(config.hash_builder),
462 weighter: config.weighter,
463 metrics: config.metrics,
464 event_listener: config.event_listener,
465 pipe: ArcSwap::new(Arc::new(pipe)),
466 };
467
468 Self { inner: Arc::new(inner) }
469 }
470
471 #[fastrace::trace(name = "foyer::memory::raw::resize")]
472 pub fn resize(&self, capacity: usize) -> Result<()> {
473 let shards = self.inner.shards.len();
474 let shard_capacity = capacity / shards;
475
476 let handles = (0..shards)
477 .map(|i| {
478 let inner = self.inner.clone();
479 std::thread::spawn(move || {
480 let mut garbages = vec![];
481 let res = inner.shards[i].write().with(|mut shard| {
482 shard.eviction.update(shard_capacity, None).inspect(|_| {
483 shard.capacity = shard_capacity;
484 shard.evict(shard_capacity, &mut garbages)
485 })
486 });
487 let pipe = inner.pipe.load();
489 let piped = pipe.is_enabled();
490 if inner.event_listener.is_some() || piped {
491 for (event, record) in garbages {
492 if let Some(listener) = inner.event_listener.as_ref() {
493 listener.on_leave(event, record.key(), record.value())
494 }
495 if piped && event == Event::Evict {
496 pipe.send(Piece::new(record));
497 }
498 }
499 }
500 res
501 })
502 })
503 .collect_vec();
504
505 let errs = handles
506 .into_iter()
507 .map(|handle| handle.join().unwrap())
508 .filter(|res| res.is_err())
509 .map(|res| res.unwrap_err())
510 .collect_vec();
511 if !errs.is_empty() {
512 return Err(Error::multiple(errs));
513 }
514
515 Ok(())
516 }
517
518 #[fastrace::trace(name = "foyer::memory::raw::insert")]
519 pub fn insert(&self, key: E::Key, value: E::Value) -> RawCacheEntry<E, S, I> {
520 self.insert_with_hint(key, value, Default::default())
521 }
522
523 #[fastrace::trace(name = "foyer::memory::raw::insert_with_hint")]
524 pub fn insert_with_hint(&self, key: E::Key, value: E::Value, hint: E::Hint) -> RawCacheEntry<E, S, I> {
525 self.emplace(key, value, hint, false)
526 }
527
528 #[fastrace::trace(name = "foyer::memory::raw::insert_ephemeral")]
529 pub fn insert_ephemeral(&self, key: E::Key, value: E::Value) -> RawCacheEntry<E, S, I> {
530 self.insert_ephemeral_with_hint(key, value, Default::default())
531 }
532
533 #[fastrace::trace(name = "foyer::memory::raw::insert_ephemeral_with_hint")]
534 pub fn insert_ephemeral_with_hint(&self, key: E::Key, value: E::Value, hint: E::Hint) -> RawCacheEntry<E, S, I> {
535 self.emplace(key, value, hint, true)
536 }
537
538 #[fastrace::trace(name = "foyer::memory::raw::emplace")]
539 fn emplace(&self, key: E::Key, value: E::Value, hint: E::Hint, ephemeral: bool) -> RawCacheEntry<E, S, I> {
540 let hash = self.inner.hash_builder.hash_one(&key);
541 let weight = (self.inner.weighter)(&key, &value);
542
543 let mut garbages = vec![];
544 let mut waiters = vec![];
545
546 let record = self.inner.shards[self.shard(hash)].write().with(|mut shard| {
547 shard.emplace(
548 Data {
549 key,
550 value,
551 hint,
552 hash,
553 weight,
554 },
555 ephemeral,
556 &mut garbages,
557 &mut waiters,
558 )
559 });
560
561 for waiter in waiters {
563 let _ = waiter.send(RawCacheEntry {
564 record: record.clone(),
565 inner: self.inner.clone(),
566 });
567 }
568
569 let pipe = self.inner.pipe.load();
571 let piped = pipe.is_enabled();
572 if self.inner.event_listener.is_some() || piped {
573 for (event, record) in garbages {
574 if let Some(listener) = self.inner.event_listener.as_ref() {
575 listener.on_leave(event, record.key(), record.value())
576 }
577 if piped && event == Event::Evict {
578 pipe.send(Piece::new(record));
579 }
580 }
581 }
582
583 RawCacheEntry {
584 record,
585 inner: self.inner.clone(),
586 }
587 }
588
589 #[fastrace::trace(name = "foyer::memory::raw::remove")]
590 pub fn remove<Q>(&self, key: &Q) -> Option<RawCacheEntry<E, S, I>>
591 where
592 Q: Hash + Equivalent<E::Key> + ?Sized,
593 {
594 let hash = self.inner.hash_builder.hash_one(key);
595
596 self.inner.shards[self.shard(hash)]
597 .write()
598 .with(|mut shard| {
599 shard.remove(hash, key).map(|record| RawCacheEntry {
600 inner: self.inner.clone(),
601 record,
602 })
603 })
604 .inspect(|record| {
605 if let Some(listener) = self.inner.event_listener.as_ref() {
607 listener.on_leave(Event::Remove, record.key(), record.value());
608 }
609 })
610 }
611
612 #[fastrace::trace(name = "foyer::memory::raw::get")]
613 pub fn get<Q>(&self, key: &Q) -> Option<RawCacheEntry<E, S, I>>
614 where
615 Q: Hash + Equivalent<E::Key> + ?Sized,
616 {
617 let hash = self.inner.hash_builder.hash_one(key);
618
619 let record = match E::acquire() {
620 Op::Noop => self.inner.shards[self.shard(hash)].read().get_noop(hash, key),
621 Op::Immutable(_) => self.inner.shards[self.shard(hash)]
622 .read()
623 .with(|shard| shard.get_immutable(hash, key)),
624 Op::Mutable(_) => self.inner.shards[self.shard(hash)]
625 .write()
626 .with(|mut shard| shard.get_mutable(hash, key)),
627 }?;
628
629 Some(RawCacheEntry {
630 inner: self.inner.clone(),
631 record,
632 })
633 }
634
635 #[fastrace::trace(name = "foyer::memory::raw::contains")]
636 pub fn contains<Q>(&self, key: &Q) -> bool
637 where
638 Q: Hash + Equivalent<E::Key> + ?Sized,
639 {
640 let hash = self.inner.hash_builder.hash_one(key);
641
642 self.inner.shards[self.shard(hash)]
643 .read()
644 .with(|shard| shard.indexer.get(hash, key).is_some())
645 }
646
647 #[fastrace::trace(name = "foyer::memory::raw::touch")]
648 pub fn touch<Q>(&self, key: &Q) -> bool
649 where
650 Q: Hash + Equivalent<E::Key> + ?Sized,
651 {
652 let hash = self.inner.hash_builder.hash_one(key);
653
654 match E::acquire() {
655 Op::Noop => self.inner.shards[self.shard(hash)].read().get_noop(hash, key),
656 Op::Immutable(_) => self.inner.shards[self.shard(hash)]
657 .read()
658 .with(|shard| shard.get_immutable(hash, key)),
659 Op::Mutable(_) => self.inner.shards[self.shard(hash)]
660 .write()
661 .with(|mut shard| shard.get_mutable(hash, key)),
662 }
663 .is_some()
664 }
665
666 #[fastrace::trace(name = "foyer::memory::raw::clear")]
667 pub fn clear(&self) {
668 self.inner.clear();
669 }
670
671 pub fn capacity(&self) -> usize {
672 self.inner.capacity
673 }
674
675 pub fn usage(&self) -> usize {
676 self.inner.shards.iter().map(|shard| shard.read().usage).sum()
677 }
678
679 pub fn metrics(&self) -> &Metrics {
680 &self.inner.metrics
681 }
682
683 pub fn hash_builder(&self) -> &Arc<S> {
684 &self.inner.hash_builder
685 }
686
687 pub fn shards(&self) -> usize {
688 self.inner.shards.len()
689 }
690
691 pub fn set_pipe(&self, pipe: Box<dyn Pipe<Key = E::Key, Value = E::Value>>) {
692 self.inner.pipe.store(Arc::new(pipe));
693 }
694
695 fn shard(&self, hash: u64) -> usize {
696 hash as usize % self.inner.shards.len()
697 }
698}
699
700pub struct RawCacheEntry<E, S = ahash::RandomState, I = HashTableIndexer<E>>
701where
702 E: Eviction,
703 S: HashBuilder,
704 I: Indexer<Eviction = E>,
705{
706 inner: Arc<RawCacheInner<E, S, I>>,
707 record: Arc<Record<E>>,
708}
709
710impl<E, S, I> Debug for RawCacheEntry<E, S, I>
711where
712 E: Eviction,
713 S: HashBuilder,
714 I: Indexer<Eviction = E>,
715{
716 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
717 f.debug_struct("RawCacheEntry").field("record", &self.record).finish()
718 }
719}
720
721impl<E, S, I> Drop for RawCacheEntry<E, S, I>
722where
723 E: Eviction,
724 S: HashBuilder,
725 I: Indexer<Eviction = E>,
726{
727 fn drop(&mut self) {
728 let hash = self.record.hash();
729 let shard = &self.inner.shards[hash as usize % self.inner.shards.len()];
730
731 if self.record.dec_refs(1) == 0 {
732 match E::release() {
733 Op::Noop => {}
734 Op::Immutable(_) => shard.read().with(|shard| shard.release_immutable(&self.record)),
735 Op::Mutable(_) => shard.write().with(|mut shard| shard.release_mutable(&self.record)),
736 }
737
738 if self.record.is_ephemeral() {
739 shard
740 .write()
741 .with(|mut shard| shard.remove(hash, self.key()))
742 .inspect(|record| {
743 if let Some(listener) = self.inner.event_listener.as_ref() {
745 listener.on_leave(Event::Remove, record.key(), record.value());
746 }
747 });
748 }
749 }
750 }
751}
752
753impl<E, S, I> Clone for RawCacheEntry<E, S, I>
754where
755 E: Eviction,
756 S: HashBuilder,
757 I: Indexer<Eviction = E>,
758{
759 fn clone(&self) -> Self {
760 self.record.inc_refs(1);
761 Self {
762 inner: self.inner.clone(),
763 record: self.record.clone(),
764 }
765 }
766}
767
768impl<E, S, I> Deref for RawCacheEntry<E, S, I>
769where
770 E: Eviction,
771 S: HashBuilder,
772 I: Indexer<Eviction = E>,
773{
774 type Target = E::Value;
775
776 fn deref(&self) -> &Self::Target {
777 self.value()
778 }
779}
780
781unsafe impl<E, S, I> Send for RawCacheEntry<E, S, I>
782where
783 E: Eviction,
784 S: HashBuilder,
785 I: Indexer<Eviction = E>,
786{
787}
788
789unsafe impl<E, S, I> Sync for RawCacheEntry<E, S, I>
790where
791 E: Eviction,
792 S: HashBuilder,
793 I: Indexer<Eviction = E>,
794{
795}
796
797impl<E, S, I> RawCacheEntry<E, S, I>
798where
799 E: Eviction,
800 S: HashBuilder,
801 I: Indexer<Eviction = E>,
802{
803 pub fn hash(&self) -> u64 {
804 self.record.hash()
805 }
806
807 pub fn key(&self) -> &E::Key {
808 self.record.key()
809 }
810
811 pub fn value(&self) -> &E::Value {
812 self.record.value()
813 }
814
815 pub fn hint(&self) -> &E::Hint {
816 self.record.hint()
817 }
818
819 pub fn weight(&self) -> usize {
820 self.record.weight()
821 }
822
823 pub fn refs(&self) -> usize {
824 self.record.refs()
825 }
826
827 pub fn is_outdated(&self) -> bool {
828 !self.record.is_in_indexer()
829 }
830
831 pub fn piece(&self) -> Piece<E::Key, E::Value> {
832 Piece::new(self.record.clone())
833 }
834}
835
836#[derive(Debug, Clone, Copy, PartialEq, Eq)]
838pub enum FetchState {
839 Hit,
841 Wait,
843 Miss,
845}
846
847pub struct FetchMark;
849
850enum RawShardFetch<E, S, I>
851where
852 E: Eviction,
853 S: HashBuilder,
854 I: Indexer<Eviction = E>,
855{
856 Hit(Arc<Record<E>>),
857 Wait(InSpan<oneshot::Receiver<RawCacheEntry<E, S, I>>>),
858 Miss,
859}
860
861pub type RawFetch<E, ER, S = ahash::RandomState, I = HashTableIndexer<E>> =
862 DiversionFuture<RawFetchInner<E, ER, S, I>, std::result::Result<RawCacheEntry<E, S, I>, ER>, FetchMark>;
863
864type RawFetchHit<E, S, I> = Option<RawCacheEntry<E, S, I>>;
865type RawFetchWait<E, S, I> = InSpan<oneshot::Receiver<RawCacheEntry<E, S, I>>>;
866type RawFetchMiss<E, I, S, ER, DFS> = JoinHandle<Diversion<std::result::Result<RawCacheEntry<E, S, I>, ER>, DFS>>;
867
868#[pin_project(project = RawFetchInnerProj)]
869pub enum RawFetchInner<E, ER, S, I>
870where
871 E: Eviction,
872 S: HashBuilder,
873 I: Indexer<Eviction = E>,
874{
875 Hit(RawFetchHit<E, S, I>),
876 Wait(#[pin] RawFetchWait<E, S, I>),
877 Miss(#[pin] RawFetchMiss<E, I, S, ER, FetchMark>),
878}
879
880impl<E, ER, S, I> RawFetchInner<E, ER, S, I>
881where
882 E: Eviction,
883 S: HashBuilder,
884 I: Indexer<Eviction = E>,
885{
886 pub fn state(&self) -> FetchState {
887 match self {
888 RawFetchInner::Hit(_) => FetchState::Hit,
889 RawFetchInner::Wait(_) => FetchState::Wait,
890 RawFetchInner::Miss(_) => FetchState::Miss,
891 }
892 }
893}
894
895impl<E, ER, S, I> Future for RawFetchInner<E, ER, S, I>
896where
897 E: Eviction,
898 ER: From<oneshot::error::RecvError>,
899 S: HashBuilder,
900 I: Indexer<Eviction = E>,
901{
902 type Output = Diversion<std::result::Result<RawCacheEntry<E, S, I>, ER>, FetchMark>;
903
904 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
905 match self.project() {
906 RawFetchInnerProj::Hit(opt) => Poll::Ready(Ok(opt.take().unwrap()).into()),
907 RawFetchInnerProj::Wait(waiter) => waiter.poll(cx).map_err(|err| err.into()).map(Diversion::from),
908 RawFetchInnerProj::Miss(handle) => handle.poll(cx).map(|join| join.unwrap()),
909 }
910 }
911}
912
913impl<E, S, I> RawCache<E, S, I>
914where
915 E: Eviction,
916 S: HashBuilder,
917 I: Indexer<Eviction = E>,
918 E::Key: Clone,
919{
920 #[fastrace::trace(name = "foyer::memory::raw::fetch")]
921 pub fn fetch<F, FU, ER>(&self, key: E::Key, fetch: F) -> RawFetch<E, ER, S, I>
922 where
923 F: FnOnce() -> FU,
924 FU: Future<Output = std::result::Result<E::Value, ER>> + Send + 'static,
925 ER: Send + 'static + Debug,
926 {
927 self.fetch_inner(
928 key,
929 Default::default(),
930 fetch,
931 &tokio::runtime::Handle::current().into(),
932 )
933 }
934
935 #[fastrace::trace(name = "foyer::memory::raw::fetch_with_hint")]
936 pub fn fetch_with_hint<F, FU, ER>(&self, key: E::Key, hint: E::Hint, fetch: F) -> RawFetch<E, ER, S, I>
937 where
938 F: FnOnce() -> FU,
939 FU: Future<Output = std::result::Result<E::Value, ER>> + Send + 'static,
940 ER: Send + 'static + Debug,
941 {
942 self.fetch_inner(key, hint, fetch, &tokio::runtime::Handle::current().into())
943 }
944
945 #[doc(hidden)]
947 #[fastrace::trace(name = "foyer::memory::raw::fetch_inner")]
948 pub fn fetch_inner<F, FU, ER, ID>(
949 &self,
950 key: E::Key,
951 hint: E::Hint,
952 fetch: F,
953 runtime: &SingletonHandle,
954 ) -> RawFetch<E, ER, S, I>
955 where
956 F: FnOnce() -> FU,
957 FU: Future<Output = ID> + Send + 'static,
958 ER: Send + 'static + Debug,
959 ID: Into<Diversion<std::result::Result<E::Value, ER>, FetchMark>>,
960 {
961 let hash = self.inner.hash_builder.hash_one(&key);
962
963 let raw = match E::acquire() {
964 Op::Noop => self.inner.shards[self.shard(hash)].read().fetch_noop(hash, &key),
965 Op::Immutable(_) => self.inner.shards[self.shard(hash)].read().fetch_immutable(hash, &key),
966 Op::Mutable(_) => self.inner.shards[self.shard(hash)].write().fetch_mutable(hash, &key),
967 };
968
969 match raw {
970 RawShardFetch::Hit(record) => {
971 return RawFetch::new(RawFetchInner::Hit(Some(RawCacheEntry {
972 record,
973 inner: self.inner.clone(),
974 })))
975 }
976 RawShardFetch::Wait(future) => return RawFetch::new(RawFetchInner::Wait(future)),
977 RawShardFetch::Miss => {}
978 }
979
980 let cache = self.clone();
981 let future = fetch();
982 let join = runtime.spawn(
983 async move {
984 let Diversion { target, store } = future
985 .in_span(Span::enter_with_local_parent("foyer::memory::raw::fetch_inner::fn"))
986 .await
987 .into();
988 let value = match target {
989 Ok(value) => value,
990 Err(e) => {
991 cache.inner.shards[cache.shard(hash)].read().waiters.lock().remove(&key);
992 tracing::debug!("[fetch]: error raise while fetching, all waiter are dropped, err: {e:?}");
993 return Diversion { target: Err(e), store };
994 }
995 };
996 let entry = cache.insert_with_hint(key, value, hint);
997 Diversion {
998 target: Ok(entry),
999 store,
1000 }
1001 }
1002 .in_span(Span::enter_with_local_parent(
1003 "foyer::memory::generic::fetch_with_runtime::spawn",
1004 )),
1005 );
1006
1007 RawFetch::new(RawFetchInner::Miss(join))
1008 }
1009}
1010
1011#[cfg(test)]
1012mod tests {
1013 use foyer_common::hasher::ModRandomState;
1014 use rand::{rngs::SmallRng, seq::IndexedRandom, RngCore, SeedableRng};
1015
1016 use super::*;
1017 use crate::eviction::{
1018 fifo::{Fifo, FifoConfig, FifoHint},
1019 lfu::{Lfu, LfuConfig, LfuHint},
1020 lru::{Lru, LruConfig, LruHint},
1021 s3fifo::{S3Fifo, S3FifoConfig, S3FifoHint},
1022 };
1023
1024 fn is_send_sync_static<T: Send + Sync + 'static>() {}
1025
1026 #[test]
1027 fn test_send_sync_static() {
1028 is_send_sync_static::<RawCache<Fifo<(), ()>>>();
1029 is_send_sync_static::<RawCache<S3Fifo<(), ()>>>();
1030 is_send_sync_static::<RawCache<Lfu<(), ()>>>();
1031 is_send_sync_static::<RawCache<Lru<(), ()>>>();
1032 }
1033
1034 fn fifo_cache_for_test() -> RawCache<Fifo<u64, u64>, ModRandomState, HashTableIndexer<Fifo<u64, u64>>> {
1035 RawCache::new(RawCacheConfig {
1036 capacity: 256,
1037 shards: 4,
1038 eviction_config: FifoConfig::default(),
1039 hash_builder: Default::default(),
1040 weighter: Arc::new(|_, _| 1),
1041 event_listener: None,
1042 metrics: Arc::new(Metrics::noop()),
1043 })
1044 }
1045
1046 fn s3fifo_cache_for_test() -> RawCache<S3Fifo<u64, u64>, ModRandomState, HashTableIndexer<S3Fifo<u64, u64>>> {
1047 RawCache::new(RawCacheConfig {
1048 capacity: 256,
1049 shards: 4,
1050 eviction_config: S3FifoConfig::default(),
1051 hash_builder: Default::default(),
1052 weighter: Arc::new(|_, _| 1),
1053 event_listener: None,
1054 metrics: Arc::new(Metrics::noop()),
1055 })
1056 }
1057
1058 fn lru_cache_for_test() -> RawCache<Lru<u64, u64>, ModRandomState, HashTableIndexer<Lru<u64, u64>>> {
1059 RawCache::new(RawCacheConfig {
1060 capacity: 256,
1061 shards: 4,
1062 eviction_config: LruConfig::default(),
1063 hash_builder: Default::default(),
1064 weighter: Arc::new(|_, _| 1),
1065 event_listener: None,
1066 metrics: Arc::new(Metrics::noop()),
1067 })
1068 }
1069
1070 fn lfu_cache_for_test() -> RawCache<Lfu<u64, u64>, ModRandomState, HashTableIndexer<Lfu<u64, u64>>> {
1071 RawCache::new(RawCacheConfig {
1072 capacity: 256,
1073 shards: 4,
1074 eviction_config: LfuConfig::default(),
1075 hash_builder: Default::default(),
1076 weighter: Arc::new(|_, _| 1),
1077 event_listener: None,
1078 metrics: Arc::new(Metrics::noop()),
1079 })
1080 }
1081
1082 #[test]
1083 fn test_insert_ephemeral() {
1084 let fifo = fifo_cache_for_test();
1085
1086 let e1 = fifo.insert_ephemeral(1, 1);
1087 assert_eq!(fifo.usage(), 1);
1088 drop(e1);
1089 assert_eq!(fifo.usage(), 0);
1090
1091 let e2a = fifo.insert_ephemeral(2, 2);
1092 assert_eq!(fifo.usage(), 1);
1093 let e2b = fifo.get(&2).expect("entry 2 should exist");
1094 drop(e2a);
1095 assert_eq!(fifo.usage(), 1);
1096 drop(e2b);
1097 assert_eq!(fifo.usage(), 1);
1098 }
1099
1100 fn test_resize<E>(cache: &RawCache<E, ModRandomState, HashTableIndexer<E>>)
1101 where
1102 E: Eviction<Key = u64, Value = u64>,
1103 {
1104 let capacity = cache.capacity();
1105 for i in 0..capacity as u64 * 2 {
1106 cache.insert(i, i);
1107 }
1108 assert_eq!(cache.usage(), capacity);
1109 cache.resize(capacity / 2).unwrap();
1110 assert_eq!(cache.usage(), capacity / 2);
1111 for i in 0..capacity as u64 * 2 {
1112 cache.insert(i, i);
1113 }
1114 assert_eq!(cache.usage(), capacity / 2);
1115 }
1116
1117 #[test]
1118 fn test_fifo_cache_resize() {
1119 let cache = fifo_cache_for_test();
1120 test_resize(&cache);
1121 }
1122
1123 #[test]
1124 fn test_s3fifo_cache_resize() {
1125 let cache = s3fifo_cache_for_test();
1126 test_resize(&cache);
1127 }
1128
1129 #[test]
1130 fn test_lru_cache_resize() {
1131 let cache = lru_cache_for_test();
1132 test_resize(&cache);
1133 }
1134
1135 #[test]
1136 fn test_lfu_cache_resize() {
1137 let cache = lfu_cache_for_test();
1138 test_resize(&cache);
1139 }
1140
1141 mod fuzzy {
1142 use super::*;
1143
1144 fn fuzzy<E>(cache: RawCache<E>, hints: Vec<E::Hint>)
1145 where
1146 E: Eviction<Key = u64, Value = u64>,
1147 {
1148 let handles = (0..8)
1149 .map(|i| {
1150 let c = cache.clone();
1151 let hints = hints.clone();
1152 std::thread::spawn(move || {
1153 let mut rng = SmallRng::seed_from_u64(i);
1154 for _ in 0..100000 {
1155 let key = rng.next_u64();
1156 if let Some(entry) = c.get(&key) {
1157 assert_eq!(key, *entry);
1158 drop(entry);
1159 continue;
1160 }
1161 let hint = hints.choose(&mut rng).cloned().unwrap();
1162 c.insert_with_hint(key, key, hint);
1163 }
1164 })
1165 })
1166 .collect_vec();
1167
1168 handles.into_iter().for_each(|handle| handle.join().unwrap());
1169
1170 assert_eq!(cache.usage(), cache.capacity());
1171 }
1172
1173 #[test_log::test]
1174 fn test_fifo_cache_fuzzy() {
1175 let cache: RawCache<Fifo<u64, u64>> = RawCache::new(RawCacheConfig {
1176 capacity: 256,
1177 shards: 4,
1178 eviction_config: FifoConfig::default(),
1179 hash_builder: Default::default(),
1180 weighter: Arc::new(|_, _| 1),
1181 event_listener: None,
1182 metrics: Arc::new(Metrics::noop()),
1183 });
1184 let hints = vec![FifoHint];
1185 fuzzy(cache, hints);
1186 }
1187
1188 #[test_log::test]
1189 fn test_s3fifo_cache_fuzzy() {
1190 let cache: RawCache<S3Fifo<u64, u64>> = RawCache::new(RawCacheConfig {
1191 capacity: 256,
1192 shards: 4,
1193 eviction_config: S3FifoConfig::default(),
1194 hash_builder: Default::default(),
1195 weighter: Arc::new(|_, _| 1),
1196 event_listener: None,
1197 metrics: Arc::new(Metrics::noop()),
1198 });
1199 let hints = vec![S3FifoHint];
1200 fuzzy(cache, hints);
1201 }
1202
1203 #[test_log::test]
1204 fn test_lru_cache_fuzzy() {
1205 let cache: RawCache<Lru<u64, u64>> = RawCache::new(RawCacheConfig {
1206 capacity: 256,
1207 shards: 4,
1208 eviction_config: LruConfig::default(),
1209 hash_builder: Default::default(),
1210 weighter: Arc::new(|_, _| 1),
1211 event_listener: None,
1212 metrics: Arc::new(Metrics::noop()),
1213 });
1214 let hints = vec![LruHint::HighPriority, LruHint::LowPriority];
1215 fuzzy(cache, hints);
1216 }
1217
1218 #[test_log::test]
1219 fn test_lfu_cache_fuzzy() {
1220 let cache: RawCache<Lfu<u64, u64>> = RawCache::new(RawCacheConfig {
1221 capacity: 256,
1222 shards: 4,
1223 eviction_config: LfuConfig::default(),
1224 hash_builder: Default::default(),
1225 weighter: Arc::new(|_, _| 1),
1226 event_listener: None,
1227 metrics: Arc::new(Metrics::noop()),
1228 });
1229 let hints = vec![LfuHint];
1230 fuzzy(cache, hints);
1231 }
1232 }
1233}