1use std::{
16 collections::hash_map::{Entry as HashMapEntry, HashMap},
17 fmt::Debug,
18 future::Future,
19 hash::Hash,
20 ops::Deref,
21 pin::Pin,
22 sync::Arc,
23 task::{Context, Poll},
24};
25
26use arc_swap::ArcSwap;
27use equivalent::Equivalent;
28#[cfg(feature = "tracing")]
29use fastrace::{
30 future::{FutureExt, InSpan},
31 Span,
32};
33use foyer_common::{
34 code::HashBuilder,
35 event::{Event, EventListener},
36 future::{Diversion, DiversionFuture},
37 metrics::Metrics,
38 properties::{Location, Properties, Source},
39 runtime::SingletonHandle,
40 strict_assert,
41 utils::scope::Scope,
42};
43use itertools::Itertools;
44use parking_lot::{Mutex, RwLock};
45use pin_project::pin_project;
46use tokio::{sync::oneshot, task::JoinHandle};
47
48use crate::{
49 error::{Error, Result},
50 eviction::{Eviction, Op},
51 indexer::{hash_table::HashTableIndexer, sentry::Sentry, Indexer},
52 pipe::NoopPipe,
53 record::{Data, Record},
54 Piece, Pipe,
55};
56
57pub trait Weighter<K, V>: Fn(&K, &V) -> usize + Send + Sync + 'static {}
61impl<K, V, T> Weighter<K, V> for T where T: Fn(&K, &V) -> usize + Send + Sync + 'static {}
62
63pub struct RawCacheConfig<E, S>
64where
65 E: Eviction,
66 S: HashBuilder,
67{
68 pub capacity: usize,
69 pub shards: usize,
70 pub eviction_config: E::Config,
71 pub hash_builder: S,
72 pub weighter: Arc<dyn Weighter<E::Key, E::Value>>,
73 pub event_listener: Option<Arc<dyn EventListener<Key = E::Key, Value = E::Value>>>,
74 pub metrics: Arc<Metrics>,
75}
76
77struct RawCacheShard<E, S, I>
78where
79 E: Eviction,
80 S: HashBuilder,
81 I: Indexer<Eviction = E>,
82{
83 eviction: E,
84 indexer: Sentry<I>,
85
86 usage: usize,
87 capacity: usize,
88
89 #[expect(clippy::type_complexity)]
90 waiters: Mutex<HashMap<E::Key, Vec<oneshot::Sender<RawCacheEntry<E, S, I>>>>>,
91
92 metrics: Arc<Metrics>,
93 _event_listener: Option<Arc<dyn EventListener<Key = E::Key, Value = E::Value>>>,
94}
95
96impl<E, S, I> RawCacheShard<E, S, I>
97where
98 E: Eviction,
99 S: HashBuilder,
100 I: Indexer<Eviction = E>,
101{
102 fn evict(&mut self, target: usize, garbages: &mut Vec<(Event, Arc<Record<E>>)>) {
104 while self.usage > target {
106 let evicted = match self.eviction.pop() {
107 Some(evicted) => evicted,
108 None => break,
109 };
110 self.metrics.memory_evict.increase(1);
111
112 let e = self.indexer.remove(evicted.hash(), evicted.key()).unwrap();
113 assert_eq!(Arc::as_ptr(&evicted), Arc::as_ptr(&e));
114
115 strict_assert!(!evicted.as_ref().is_in_indexer());
116 strict_assert!(!evicted.as_ref().is_in_eviction());
117
118 self.usage -= evicted.weight();
119
120 garbages.push((Event::Evict, evicted));
121 }
122 }
123
124 fn emplace(
125 &mut self,
126 data: Data<E>,
127 garbages: &mut Vec<(Event, Arc<Record<E>>)>,
128 waiters: &mut Vec<oneshot::Sender<RawCacheEntry<E, S, I>>>,
129 ) -> Arc<Record<E>> {
130 *waiters = self.waiters.lock().remove(&data.key).unwrap_or_default();
131
132 let weight = data.weight;
133 let old_usage = self.usage;
134
135 let record = Arc::new(Record::new(data));
136
137 self.evict(self.capacity.saturating_sub(weight), garbages);
139
140 if let Some(old) = self.indexer.insert(record.clone()) {
142 self.metrics.memory_replace.increase(1);
143
144 strict_assert!(!old.is_in_indexer());
145
146 if old.is_in_eviction() {
147 self.eviction.remove(&old);
148 }
149 strict_assert!(!old.is_in_eviction());
150
151 self.usage -= old.weight();
152
153 garbages.push((Event::Replace, old));
154 } else {
155 self.metrics.memory_insert.increase(1);
156 }
157 strict_assert!(record.is_in_indexer());
158
159 let ephemeral = record.properties().ephemeral().unwrap_or_default();
160 record.set_ephemeral(ephemeral);
161 if !ephemeral {
162 self.eviction.push(record.clone());
163 strict_assert!(record.is_in_eviction());
164 }
165
166 self.usage += weight;
167 let refs = waiters.len() + 1;
170 let inc = record.inc_refs(refs);
171 assert_eq!(refs, inc);
172
173 match self.usage.cmp(&old_usage) {
174 std::cmp::Ordering::Greater => self.metrics.memory_usage.increase((self.usage - old_usage) as _),
175 std::cmp::Ordering::Less => self.metrics.memory_usage.decrease((old_usage - self.usage) as _),
176 std::cmp::Ordering::Equal => {}
177 }
178
179 record
180 }
181
182 #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::shard::remove"))]
183 fn remove<Q>(&mut self, hash: u64, key: &Q) -> Option<Arc<Record<E>>>
184 where
185 Q: Hash + Equivalent<E::Key> + ?Sized,
186 {
187 let record = self.indexer.remove(hash, key)?;
188
189 if record.is_in_eviction() {
190 self.eviction.remove(&record);
191 }
192 strict_assert!(!record.is_in_indexer());
193 strict_assert!(!record.is_in_eviction());
194
195 self.usage -= record.weight();
196
197 self.metrics.memory_remove.increase(1);
198 self.metrics.memory_usage.decrease(record.weight() as _);
199
200 record.inc_refs(1);
201
202 Some(record)
203 }
204
205 #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::shard::get_noop"))]
206 fn get_noop<Q>(&self, hash: u64, key: &Q) -> Option<Arc<Record<E>>>
207 where
208 Q: Hash + Equivalent<E::Key> + ?Sized,
209 {
210 self.get_inner(hash, key)
211 }
212
213 #[cfg_attr(
214 feature = "tracing",
215 fastrace::trace(name = "foyer::memory::raw::shard::get_immutable")
216 )]
217 fn get_immutable<Q>(&self, hash: u64, key: &Q) -> Option<Arc<Record<E>>>
218 where
219 Q: Hash + Equivalent<E::Key> + ?Sized,
220 {
221 self.get_inner(hash, key)
222 .inspect(|record| self.acquire_immutable(record))
223 }
224
225 #[cfg_attr(
226 feature = "tracing",
227 fastrace::trace(name = "foyer::memory::raw::shard::get_mutable")
228 )]
229 fn get_mutable<Q>(&mut self, hash: u64, key: &Q) -> Option<Arc<Record<E>>>
230 where
231 Q: Hash + Equivalent<E::Key> + ?Sized,
232 {
233 self.get_inner(hash, key).inspect(|record| self.acquire_mutable(record))
234 }
235
236 #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::shard::get_inner"))]
237 fn get_inner<Q>(&self, hash: u64, key: &Q) -> Option<Arc<Record<E>>>
238 where
239 Q: Hash + Equivalent<E::Key> + ?Sized,
240 {
241 let record = match self.indexer.get(hash, key).cloned() {
242 Some(record) => {
243 self.metrics.memory_hit.increase(1);
244 record
245 }
246 None => {
247 self.metrics.memory_miss.increase(1);
248 return None;
249 }
250 };
251
252 strict_assert!(record.is_in_indexer());
253
254 record.set_ephemeral(false);
255
256 record.inc_refs(1);
257
258 Some(record)
259 }
260
261 #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::shard::clear"))]
262 fn clear(&mut self, garbages: &mut Vec<Arc<Record<E>>>) {
263 let records = self.indexer.drain().collect_vec();
264 self.eviction.clear();
265
266 let mut count = 0;
267
268 for record in records {
269 count += 1;
270 strict_assert!(!record.is_in_indexer());
271 strict_assert!(!record.is_in_eviction());
272
273 garbages.push(record);
274 }
275
276 self.metrics.memory_remove.increase(count);
277 }
278
279 #[cfg_attr(
280 feature = "tracing",
281 fastrace::trace(name = "foyer::memory::raw::shard::acquire_immutable")
282 )]
283 fn acquire_immutable(&self, record: &Arc<Record<E>>) {
284 match E::acquire() {
285 Op::Immutable(f) => f(&self.eviction, record),
286 _ => unreachable!(),
287 }
288 }
289
290 #[cfg_attr(
291 feature = "tracing",
292 fastrace::trace(name = "foyer::memory::raw::shard::acquire_mutable")
293 )]
294 fn acquire_mutable(&mut self, record: &Arc<Record<E>>) {
295 match E::acquire() {
296 Op::Mutable(mut f) => f(&mut self.eviction, record),
297 _ => unreachable!(),
298 }
299 }
300
301 #[cfg_attr(
302 feature = "tracing",
303 fastrace::trace(name = "foyer::memory::raw::shard::release_immutable")
304 )]
305 fn release_immutable(&self, record: &Arc<Record<E>>) {
306 match E::release() {
307 Op::Immutable(f) => f(&self.eviction, record),
308 _ => unreachable!(),
309 }
310 }
311
312 #[cfg_attr(
313 feature = "tracing",
314 fastrace::trace(name = "foyer::memory::raw::shard::release_mutable")
315 )]
316 fn release_mutable(&mut self, record: &Arc<Record<E>>) {
317 match E::release() {
318 Op::Mutable(mut f) => f(&mut self.eviction, record),
319 _ => unreachable!(),
320 }
321 }
322
323 #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::shard::fetch_noop"))]
324 fn fetch_noop(&self, hash: u64, key: &E::Key) -> RawShardFetch<E, S, I>
325 where
326 E::Key: Clone,
327 {
328 if let Some(record) = self.get_noop(hash, key) {
329 return RawShardFetch::Hit(record);
330 }
331
332 self.fetch_queue(key.clone())
333 }
334
335 #[cfg_attr(
336 feature = "tracing",
337 fastrace::trace(name = "foyer::memory::raw::shard::fetch_immutable")
338 )]
339 fn fetch_immutable(&self, hash: u64, key: &E::Key) -> RawShardFetch<E, S, I>
340 where
341 E::Key: Clone,
342 {
343 if let Some(record) = self.get_immutable(hash, key) {
344 return RawShardFetch::Hit(record);
345 }
346
347 self.fetch_queue(key.clone())
348 }
349
350 #[cfg_attr(
351 feature = "tracing",
352 fastrace::trace(name = "foyer::memory::raw::shard::fetch_mutable")
353 )]
354 fn fetch_mutable(&mut self, hash: u64, key: &E::Key) -> RawShardFetch<E, S, I>
355 where
356 E::Key: Clone,
357 {
358 if let Some(record) = self.get_mutable(hash, key) {
359 return RawShardFetch::Hit(record);
360 }
361
362 self.fetch_queue(key.clone())
363 }
364
365 #[cfg_attr(
366 feature = "tracing",
367 fastrace::trace(name = "foyer::memory::raw::shard::fetch_queue")
368 )]
369 fn fetch_queue(&self, key: E::Key) -> RawShardFetch<E, S, I> {
370 match self.waiters.lock().entry(key) {
371 HashMapEntry::Occupied(mut o) => {
372 let (tx, rx) = oneshot::channel();
373 o.get_mut().push(tx);
374 self.metrics.memory_queue.increase(1);
375 #[cfg(feature = "tracing")]
376 let wait = rx.in_span(Span::enter_with_local_parent(
377 "foyer::memory::raw::fetch_with_runtime::wait",
378 ));
379 #[cfg(not(feature = "tracing"))]
380 let wait = rx;
381 RawShardFetch::Wait(wait)
382 }
383 HashMapEntry::Vacant(v) => {
384 v.insert(vec![]);
385 self.metrics.memory_fetch.increase(1);
386 RawShardFetch::Miss
387 }
388 }
389 }
390}
391
392#[expect(clippy::type_complexity)]
393struct RawCacheInner<E, S, I>
394where
395 E: Eviction,
396 S: HashBuilder,
397 I: Indexer<Eviction = E>,
398{
399 shards: Vec<RwLock<RawCacheShard<E, S, I>>>,
400
401 capacity: usize,
402
403 hash_builder: Arc<S>,
404 weighter: Arc<dyn Weighter<E::Key, E::Value>>,
405
406 metrics: Arc<Metrics>,
407 event_listener: Option<Arc<dyn EventListener<Key = E::Key, Value = E::Value>>>,
408 pipe: ArcSwap<Box<dyn Pipe<Key = E::Key, Value = E::Value, Properties = E::Properties>>>,
409}
410
411impl<E, S, I> RawCacheInner<E, S, I>
412where
413 E: Eviction,
414 S: HashBuilder,
415 I: Indexer<Eviction = E>,
416{
417 #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::inner::clear"))]
418 fn clear(&self) {
419 let mut garbages = vec![];
420
421 self.shards
422 .iter()
423 .map(|shard| shard.write())
424 .for_each(|mut shard| shard.clear(&mut garbages));
425
426 if let Some(listener) = self.event_listener.as_ref() {
428 for record in garbages {
429 listener.on_leave(Event::Clear, record.key(), record.value());
430 }
431 }
432 }
433}
434
435pub struct RawCache<E, S, I = HashTableIndexer<E>>
436where
437 E: Eviction,
438 S: HashBuilder,
439 I: Indexer<Eviction = E>,
440{
441 inner: Arc<RawCacheInner<E, S, I>>,
442}
443
444impl<E, S, I> Drop for RawCacheInner<E, S, I>
445where
446 E: Eviction,
447 S: HashBuilder,
448 I: Indexer<Eviction = E>,
449{
450 fn drop(&mut self) {
451 self.clear();
452 }
453}
454
455impl<E, S, I> Clone for RawCache<E, S, I>
456where
457 E: Eviction,
458 S: HashBuilder,
459 I: Indexer<Eviction = E>,
460{
461 fn clone(&self) -> Self {
462 Self {
463 inner: self.inner.clone(),
464 }
465 }
466}
467
468impl<E, S, I> RawCache<E, S, I>
469where
470 E: Eviction,
471 S: HashBuilder,
472 I: Indexer<Eviction = E>,
473{
474 pub fn new(config: RawCacheConfig<E, S>) -> Self {
475 let shard_capacity = config.capacity / config.shards;
476
477 let shards = (0..config.shards)
478 .map(|_| RawCacheShard {
479 eviction: E::new(shard_capacity, &config.eviction_config),
480 indexer: Sentry::default(),
481 usage: 0,
482 capacity: shard_capacity,
483 waiters: Mutex::default(),
484 metrics: config.metrics.clone(),
485 _event_listener: config.event_listener.clone(),
486 })
487 .map(RwLock::new)
488 .collect_vec();
489
490 let pipe: Box<dyn Pipe<Key = E::Key, Value = E::Value, Properties = E::Properties>> =
491 Box::new(NoopPipe::default());
492
493 let inner = RawCacheInner {
494 shards,
495 capacity: config.capacity,
496 hash_builder: Arc::new(config.hash_builder),
497 weighter: config.weighter,
498 metrics: config.metrics,
499 event_listener: config.event_listener,
500 pipe: ArcSwap::new(Arc::new(pipe)),
501 };
502
503 Self { inner: Arc::new(inner) }
504 }
505
506 #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::resize"))]
507 pub fn resize(&self, capacity: usize) -> Result<()> {
508 let shards = self.inner.shards.len();
509 let shard_capacity = capacity / shards;
510
511 let handles = (0..shards)
512 .map(|i| {
513 let inner = self.inner.clone();
514 std::thread::spawn(move || {
515 let mut garbages = vec![];
516 let res = inner.shards[i].write().with(|mut shard| {
517 shard.eviction.update(shard_capacity, None).inspect(|_| {
518 shard.capacity = shard_capacity;
519 shard.evict(shard_capacity, &mut garbages)
520 })
521 });
522 let pipe = inner.pipe.load();
524 let piped = pipe.is_enabled();
525 if inner.event_listener.is_some() || piped {
526 for (event, record) in garbages {
527 if let Some(listener) = inner.event_listener.as_ref() {
528 listener.on_leave(event, record.key(), record.value())
529 }
530 if piped && event == Event::Evict {
531 pipe.send(Piece::new(record));
532 }
533 }
534 }
535 res
536 })
537 })
538 .collect_vec();
539
540 let errs = handles
541 .into_iter()
542 .map(|handle| handle.join().unwrap())
543 .filter(|res| res.is_err())
544 .map(|res| res.unwrap_err())
545 .collect_vec();
546 if !errs.is_empty() {
547 return Err(Error::multiple(errs));
548 }
549
550 Ok(())
551 }
552
553 #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::insert"))]
554 pub fn insert(&self, key: E::Key, value: E::Value) -> RawCacheEntry<E, S, I> {
555 self.insert_with_properties(key, value, Default::default())
556 }
557
558 #[cfg_attr(
559 feature = "tracing",
560 fastrace::trace(name = "foyer::memory::raw::insert_with_properties")
561 )]
562 pub fn insert_with_properties(
563 &self,
564 key: E::Key,
565 value: E::Value,
566 properties: E::Properties,
567 ) -> RawCacheEntry<E, S, I> {
568 let hash = self.inner.hash_builder.hash_one(&key);
569 let weight = (self.inner.weighter)(&key, &value);
570
571 let mut garbages = vec![];
572 let mut waiters = vec![];
573
574 let record = self.inner.shards[self.shard(hash)].write().with(|mut shard| {
575 shard.emplace(
576 Data {
577 key,
578 value,
579 properties,
580 hash,
581 weight,
582 },
583 &mut garbages,
584 &mut waiters,
585 )
586 });
587
588 for waiter in waiters {
590 let _ = waiter.send(RawCacheEntry {
591 record: record.clone(),
592 inner: self.inner.clone(),
593 });
594 }
595
596 let pipe = self.inner.pipe.load();
598 let piped = pipe.is_enabled();
599 if self.inner.event_listener.is_some() || piped {
600 for (event, record) in garbages {
601 if let Some(listener) = self.inner.event_listener.as_ref() {
602 listener.on_leave(event, record.key(), record.value())
603 }
604 if piped && event == Event::Evict {
605 pipe.send(Piece::new(record));
606 }
607 }
608 }
609
610 RawCacheEntry {
611 record,
612 inner: self.inner.clone(),
613 }
614 }
615
616 #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::evict_all"))]
618 pub fn evict_all(&self) {
619 let mut garbages = vec![];
620 for shard in self.inner.shards.iter() {
621 shard.write().evict(0, &mut garbages);
622 }
623
624 let pipe = self.inner.pipe.load();
626 let piped = pipe.is_enabled();
627 if self.inner.event_listener.is_some() || piped {
628 for (event, record) in garbages {
629 if let Some(listener) = self.inner.event_listener.as_ref() {
630 listener.on_leave(event, record.key(), record.value())
631 }
632 if piped && event == Event::Evict {
633 pipe.send(Piece::new(record));
634 }
635 }
636 }
637 }
638
639 #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::flush"))]
644 pub async fn flush(&self) {
645 let mut garbages = vec![];
646 for shard in self.inner.shards.iter() {
647 shard.write().evict(0, &mut garbages);
648 }
649
650 let pipe = self.inner.pipe.load();
652 let piped = pipe.is_enabled();
653
654 if let Some(listener) = self.inner.event_listener.as_ref() {
655 for (event, record) in garbages.iter() {
656 listener.on_leave(*event, record.key(), record.value());
657 }
658 }
659 if piped {
660 let pieces = garbages.into_iter().map(|(_, record)| Piece::new(record)).collect_vec();
661 pipe.flush(pieces).await;
662 }
663 }
664
665 #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::remove"))]
666 pub fn remove<Q>(&self, key: &Q) -> Option<RawCacheEntry<E, S, I>>
667 where
668 Q: Hash + Equivalent<E::Key> + ?Sized,
669 {
670 let hash = self.inner.hash_builder.hash_one(key);
671
672 self.inner.shards[self.shard(hash)]
673 .write()
674 .with(|mut shard| {
675 shard.remove(hash, key).map(|record| RawCacheEntry {
676 inner: self.inner.clone(),
677 record,
678 })
679 })
680 .inspect(|record| {
681 if let Some(listener) = self.inner.event_listener.as_ref() {
683 listener.on_leave(Event::Remove, record.key(), record.value());
684 }
685 })
686 }
687
688 #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::get"))]
689 pub fn get<Q>(&self, key: &Q) -> Option<RawCacheEntry<E, S, I>>
690 where
691 Q: Hash + Equivalent<E::Key> + ?Sized,
692 {
693 let hash = self.inner.hash_builder.hash_one(key);
694
695 let record = match E::acquire() {
696 Op::Noop => self.inner.shards[self.shard(hash)].read().get_noop(hash, key),
697 Op::Immutable(_) => self.inner.shards[self.shard(hash)]
698 .read()
699 .with(|shard| shard.get_immutable(hash, key)),
700 Op::Mutable(_) => self.inner.shards[self.shard(hash)]
701 .write()
702 .with(|mut shard| shard.get_mutable(hash, key)),
703 }?;
704
705 Some(RawCacheEntry {
706 inner: self.inner.clone(),
707 record,
708 })
709 }
710
711 #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::contains"))]
712 pub fn contains<Q>(&self, key: &Q) -> bool
713 where
714 Q: Hash + Equivalent<E::Key> + ?Sized,
715 {
716 let hash = self.inner.hash_builder.hash_one(key);
717
718 self.inner.shards[self.shard(hash)]
719 .read()
720 .with(|shard| shard.indexer.get(hash, key).is_some())
721 }
722
723 #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::touch"))]
724 pub fn touch<Q>(&self, key: &Q) -> bool
725 where
726 Q: Hash + Equivalent<E::Key> + ?Sized,
727 {
728 let hash = self.inner.hash_builder.hash_one(key);
729
730 match E::acquire() {
731 Op::Noop => self.inner.shards[self.shard(hash)].read().get_noop(hash, key),
732 Op::Immutable(_) => self.inner.shards[self.shard(hash)]
733 .read()
734 .with(|shard| shard.get_immutable(hash, key)),
735 Op::Mutable(_) => self.inner.shards[self.shard(hash)]
736 .write()
737 .with(|mut shard| shard.get_mutable(hash, key)),
738 }
739 .is_some()
740 }
741
742 #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::clear"))]
743 pub fn clear(&self) {
744 self.inner.clear();
745 }
746
747 pub fn capacity(&self) -> usize {
748 self.inner.capacity
749 }
750
751 pub fn usage(&self) -> usize {
752 self.inner.shards.iter().map(|shard| shard.read().usage).sum()
753 }
754
755 pub fn metrics(&self) -> &Metrics {
756 &self.inner.metrics
757 }
758
759 pub fn hash_builder(&self) -> &Arc<S> {
760 &self.inner.hash_builder
761 }
762
763 pub fn shards(&self) -> usize {
764 self.inner.shards.len()
765 }
766
767 pub fn set_pipe(&self, pipe: Box<dyn Pipe<Key = E::Key, Value = E::Value, Properties = E::Properties>>) {
768 self.inner.pipe.store(Arc::new(pipe));
769 }
770
771 fn shard(&self, hash: u64) -> usize {
772 hash as usize % self.inner.shards.len()
773 }
774}
775
776pub struct RawCacheEntry<E, S, I = HashTableIndexer<E>>
777where
778 E: Eviction,
779 S: HashBuilder,
780 I: Indexer<Eviction = E>,
781{
782 inner: Arc<RawCacheInner<E, S, I>>,
783 record: Arc<Record<E>>,
784}
785
786impl<E, S, I> Debug for RawCacheEntry<E, S, I>
787where
788 E: Eviction,
789 S: HashBuilder,
790 I: Indexer<Eviction = E>,
791{
792 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
793 f.debug_struct("RawCacheEntry").field("record", &self.record).finish()
794 }
795}
796
797impl<E, S, I> Drop for RawCacheEntry<E, S, I>
798where
799 E: Eviction,
800 S: HashBuilder,
801 I: Indexer<Eviction = E>,
802{
803 fn drop(&mut self) {
804 let hash = self.record.hash();
805 let shard = &self.inner.shards[hash as usize % self.inner.shards.len()];
806
807 if self.record.dec_refs(1) == 0 {
808 match E::release() {
809 Op::Noop => {}
810 Op::Immutable(_) => shard.read().with(|shard| shard.release_immutable(&self.record)),
811 Op::Mutable(_) => shard.write().with(|mut shard| shard.release_mutable(&self.record)),
812 }
813
814 if self.record.is_ephemeral() {
815 shard
816 .write()
817 .with(|mut shard| shard.remove(hash, self.key()))
818 .inspect(|record| {
819 let pipe = self.inner.pipe.load();
821 let piped = pipe.is_enabled();
822 let event = Event::Evict;
823 if self.inner.event_listener.is_some() || piped {
824 if let Some(listener) = self.inner.event_listener.as_ref() {
825 listener.on_leave(Event::Evict, record.key(), record.value());
826 }
827 }
828 if piped && event == Event::Evict {
829 pipe.send(self.piece());
830 }
831 });
832 }
833 }
834 }
835}
836
837impl<E, S, I> Clone for RawCacheEntry<E, S, I>
838where
839 E: Eviction,
840 S: HashBuilder,
841 I: Indexer<Eviction = E>,
842{
843 fn clone(&self) -> Self {
844 self.record.inc_refs(1);
845 Self {
846 inner: self.inner.clone(),
847 record: self.record.clone(),
848 }
849 }
850}
851
852impl<E, S, I> Deref for RawCacheEntry<E, S, I>
853where
854 E: Eviction,
855 S: HashBuilder,
856 I: Indexer<Eviction = E>,
857{
858 type Target = E::Value;
859
860 fn deref(&self) -> &Self::Target {
861 self.value()
862 }
863}
864
865unsafe impl<E, S, I> Send for RawCacheEntry<E, S, I>
866where
867 E: Eviction,
868 S: HashBuilder,
869 I: Indexer<Eviction = E>,
870{
871}
872
873unsafe impl<E, S, I> Sync for RawCacheEntry<E, S, I>
874where
875 E: Eviction,
876 S: HashBuilder,
877 I: Indexer<Eviction = E>,
878{
879}
880
881impl<E, S, I> RawCacheEntry<E, S, I>
882where
883 E: Eviction,
884 S: HashBuilder,
885 I: Indexer<Eviction = E>,
886{
887 pub fn hash(&self) -> u64 {
888 self.record.hash()
889 }
890
891 pub fn key(&self) -> &E::Key {
892 self.record.key()
893 }
894
895 pub fn value(&self) -> &E::Value {
896 self.record.value()
897 }
898
899 pub fn properties(&self) -> &E::Properties {
900 self.record.properties()
901 }
902
903 pub fn weight(&self) -> usize {
904 self.record.weight()
905 }
906
907 pub fn refs(&self) -> usize {
908 self.record.refs()
909 }
910
911 pub fn is_outdated(&self) -> bool {
912 !self.record.is_in_indexer()
913 }
914
915 pub fn piece(&self) -> Piece<E::Key, E::Value, E::Properties> {
916 Piece::new(self.record.clone())
917 }
918}
919
920#[derive(Debug, Clone, Copy, PartialEq, Eq)]
922pub enum FetchState {
923 Hit,
925 Wait,
927 Miss,
929}
930
931#[derive(Debug)]
933pub struct FetchContext {
934 pub throttled: bool,
936 pub source: Source,
938}
939
940enum RawShardFetch<E, S, I>
941where
942 E: Eviction,
943 S: HashBuilder,
944 I: Indexer<Eviction = E>,
945{
946 Hit(Arc<Record<E>>),
947 Wait(RawFetchWait<E, S, I>),
948 Miss,
949}
950
951pub type RawFetch<E, ER, S, I = HashTableIndexer<E>> =
952 DiversionFuture<RawFetchInner<E, ER, S, I>, std::result::Result<RawCacheEntry<E, S, I>, ER>, FetchContext>;
953
954type RawFetchHit<E, S, I> = Option<RawCacheEntry<E, S, I>>;
955#[cfg(feature = "tracing")]
956type RawFetchWait<E, S, I> = InSpan<oneshot::Receiver<RawCacheEntry<E, S, I>>>;
957#[cfg(not(feature = "tracing"))]
958type RawFetchWait<E, S, I> = oneshot::Receiver<RawCacheEntry<E, S, I>>;
959type RawFetchMiss<E, I, S, ER, DFS> = JoinHandle<Diversion<std::result::Result<RawCacheEntry<E, S, I>, ER>, DFS>>;
960
961#[pin_project(project = RawFetchInnerProj)]
962pub enum RawFetchInner<E, ER, S, I>
963where
964 E: Eviction,
965 S: HashBuilder,
966 I: Indexer<Eviction = E>,
967{
968 Hit(RawFetchHit<E, S, I>),
969 Wait(#[pin] RawFetchWait<E, S, I>),
970 Miss(#[pin] RawFetchMiss<E, I, S, ER, FetchContext>),
971}
972
973impl<E, ER, S, I> RawFetchInner<E, ER, S, I>
974where
975 E: Eviction,
976 S: HashBuilder,
977 I: Indexer<Eviction = E>,
978{
979 pub fn state(&self) -> FetchState {
980 match self {
981 RawFetchInner::Hit(_) => FetchState::Hit,
982 RawFetchInner::Wait(_) => FetchState::Wait,
983 RawFetchInner::Miss(_) => FetchState::Miss,
984 }
985 }
986}
987
988impl<E, ER, S, I> Future for RawFetchInner<E, ER, S, I>
989where
990 E: Eviction,
991 ER: From<oneshot::error::RecvError>,
992 S: HashBuilder,
993 I: Indexer<Eviction = E>,
994{
995 type Output = Diversion<std::result::Result<RawCacheEntry<E, S, I>, ER>, FetchContext>;
996
997 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
998 match self.project() {
999 RawFetchInnerProj::Hit(opt) => Poll::Ready(Ok(opt.take().unwrap()).into()),
1000 RawFetchInnerProj::Wait(waiter) => waiter.poll(cx).map_err(|err| err.into()).map(Diversion::from),
1001 RawFetchInnerProj::Miss(handle) => handle.poll(cx).map(|join| join.unwrap()),
1002 }
1003 }
1004}
1005
1006impl<E, S, I> RawCache<E, S, I>
1007where
1008 E: Eviction,
1009 S: HashBuilder,
1010 I: Indexer<Eviction = E>,
1011 E::Key: Clone,
1012{
1013 #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::fetch"))]
1014 pub fn fetch<F, FU, ER>(&self, key: E::Key, fetch: F) -> RawFetch<E, ER, S, I>
1015 where
1016 F: FnOnce() -> FU,
1017 FU: Future<Output = std::result::Result<E::Value, ER>> + Send + 'static,
1018 ER: Send + 'static + Debug,
1019 {
1020 self.fetch_inner(
1021 key,
1022 Default::default(),
1023 fetch,
1024 &tokio::runtime::Handle::current().into(),
1025 )
1026 }
1027
1028 #[cfg_attr(
1029 feature = "tracing",
1030 fastrace::trace(name = "foyer::memory::raw::fetch_with_properties")
1031 )]
1032 pub fn fetch_with_properties<F, FU, ER, ID>(
1033 &self,
1034 key: E::Key,
1035 properties: E::Properties,
1036 fetch: F,
1037 ) -> RawFetch<E, ER, S, I>
1038 where
1039 F: FnOnce() -> FU,
1040 FU: Future<Output = ID> + Send + 'static,
1041 ER: Send + 'static + Debug,
1042 ID: Into<Diversion<std::result::Result<E::Value, ER>, FetchContext>>,
1043 {
1044 self.fetch_inner(key, properties, fetch, &tokio::runtime::Handle::current().into())
1045 }
1046
1047 #[doc(hidden)]
1051 #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::fetch_inner"))]
1052 pub fn fetch_inner<F, FU, ER, ID>(
1053 &self,
1054 key: E::Key,
1055 mut properties: E::Properties,
1056 fetch: F,
1057 runtime: &SingletonHandle,
1058 ) -> RawFetch<E, ER, S, I>
1059 where
1060 F: FnOnce() -> FU,
1061 FU: Future<Output = ID> + Send + 'static,
1062 ER: Send + 'static + Debug,
1063 ID: Into<Diversion<std::result::Result<E::Value, ER>, FetchContext>>,
1064 {
1065 let hash = self.inner.hash_builder.hash_one(&key);
1066
1067 let raw = match E::acquire() {
1068 Op::Noop => self.inner.shards[self.shard(hash)].read().fetch_noop(hash, &key),
1069 Op::Immutable(_) => self.inner.shards[self.shard(hash)].read().fetch_immutable(hash, &key),
1070 Op::Mutable(_) => self.inner.shards[self.shard(hash)].write().fetch_mutable(hash, &key),
1071 };
1072
1073 match raw {
1074 RawShardFetch::Hit(record) => {
1075 return RawFetch::new(RawFetchInner::Hit(Some(RawCacheEntry {
1076 record,
1077 inner: self.inner.clone(),
1078 })))
1079 }
1080 RawShardFetch::Wait(future) => return RawFetch::new(RawFetchInner::Wait(future)),
1081 RawShardFetch::Miss => {}
1082 }
1083
1084 let cache = self.clone();
1085 let future = fetch();
1086 let join = runtime.spawn({
1087 let task = async move {
1088 #[cfg(feature = "tracing")]
1089 let Diversion { target, store } = future
1090 .in_span(Span::enter_with_local_parent("foyer::memory::raw::fetch_inner::fn"))
1091 .await
1092 .into();
1093 #[cfg(not(feature = "tracing"))]
1094 let Diversion { target, store } = future.await.into();
1095
1096 let value = match target {
1097 Ok(value) => value,
1098 Err(e) => {
1099 cache.inner.shards[cache.shard(hash)].read().waiters.lock().remove(&key);
1100 tracing::debug!("[fetch]: error raise while fetching, all waiter are dropped, err: {e:?}");
1101 return Diversion { target: Err(e), store };
1102 }
1103 };
1104 if let Some(ctx) = store.as_ref() {
1105 if ctx.throttled {
1106 properties = properties.with_location(Location::InMem)
1107 }
1108 properties = properties.with_source(ctx.source)
1109 };
1110 let location = properties.location().unwrap_or_default();
1111 properties = properties.with_ephemeral(matches! {location, Location::OnDisk});
1112 let entry = cache.insert_with_properties(key, value, properties);
1113 Diversion {
1114 target: Ok(entry),
1115 store,
1116 }
1117 };
1118 #[cfg(feature = "tracing")]
1119 let task = task.in_span(Span::enter_with_local_parent(
1120 "foyer::memory::generic::fetch_with_runtime::spawn",
1121 ));
1122 task
1123 });
1124
1125 RawFetch::new(RawFetchInner::Miss(join))
1126 }
1127}
1128
1129#[cfg(test)]
1130mod tests {
1131 use foyer_common::hasher::ModHasher;
1132 use rand::{rngs::SmallRng, seq::IndexedRandom, RngCore, SeedableRng};
1133
1134 use super::*;
1135 use crate::{
1136 eviction::{
1137 fifo::{Fifo, FifoConfig},
1138 lfu::{Lfu, LfuConfig},
1139 lru::{Lru, LruConfig},
1140 s3fifo::{S3Fifo, S3FifoConfig},
1141 test_utils::TestProperties,
1142 },
1143 test_utils::PiecePipe,
1144 };
1145
1146 fn is_send_sync_static<T: Send + Sync + 'static>() {}
1147
1148 #[test]
1149 fn test_send_sync_static() {
1150 is_send_sync_static::<RawCache<Fifo<(), (), TestProperties>, ModHasher>>();
1151 is_send_sync_static::<RawCache<S3Fifo<(), (), TestProperties>, ModHasher>>();
1152 is_send_sync_static::<RawCache<Lfu<(), (), TestProperties>, ModHasher>>();
1153 is_send_sync_static::<RawCache<Lru<(), (), TestProperties>, ModHasher>>();
1154 }
1155
1156 #[expect(clippy::type_complexity)]
1157 fn fifo_cache_for_test(
1158 ) -> RawCache<Fifo<u64, u64, TestProperties>, ModHasher, HashTableIndexer<Fifo<u64, u64, TestProperties>>> {
1159 RawCache::new(RawCacheConfig {
1160 capacity: 256,
1161 shards: 4,
1162 eviction_config: FifoConfig::default(),
1163 hash_builder: Default::default(),
1164 weighter: Arc::new(|_, _| 1),
1165 event_listener: None,
1166 metrics: Arc::new(Metrics::noop()),
1167 })
1168 }
1169
1170 #[expect(clippy::type_complexity)]
1171 fn s3fifo_cache_for_test(
1172 ) -> RawCache<S3Fifo<u64, u64, TestProperties>, ModHasher, HashTableIndexer<S3Fifo<u64, u64, TestProperties>>> {
1173 RawCache::new(RawCacheConfig {
1174 capacity: 256,
1175 shards: 4,
1176 eviction_config: S3FifoConfig::default(),
1177 hash_builder: Default::default(),
1178 weighter: Arc::new(|_, _| 1),
1179 event_listener: None,
1180 metrics: Arc::new(Metrics::noop()),
1181 })
1182 }
1183
1184 #[expect(clippy::type_complexity)]
1185 fn lru_cache_for_test(
1186 ) -> RawCache<Lru<u64, u64, TestProperties>, ModHasher, HashTableIndexer<Lru<u64, u64, TestProperties>>> {
1187 RawCache::new(RawCacheConfig {
1188 capacity: 256,
1189 shards: 4,
1190 eviction_config: LruConfig::default(),
1191 hash_builder: Default::default(),
1192 weighter: Arc::new(|_, _| 1),
1193 event_listener: None,
1194 metrics: Arc::new(Metrics::noop()),
1195 })
1196 }
1197
1198 #[expect(clippy::type_complexity)]
1199 fn lfu_cache_for_test(
1200 ) -> RawCache<Lfu<u64, u64, TestProperties>, ModHasher, HashTableIndexer<Lfu<u64, u64, TestProperties>>> {
1201 RawCache::new(RawCacheConfig {
1202 capacity: 256,
1203 shards: 4,
1204 eviction_config: LfuConfig::default(),
1205 hash_builder: Default::default(),
1206 weighter: Arc::new(|_, _| 1),
1207 event_listener: None,
1208 metrics: Arc::new(Metrics::noop()),
1209 })
1210 }
1211
1212 #[test_log::test]
1213 fn test_insert_ephemeral() {
1214 let fifo = fifo_cache_for_test();
1215
1216 let e1 = fifo.insert_with_properties(1, 1, TestProperties::default().with_ephemeral(true));
1217 assert_eq!(fifo.usage(), 1);
1218 drop(e1);
1219 assert_eq!(fifo.usage(), 0);
1220
1221 let e2a = fifo.insert_with_properties(2, 2, TestProperties::default().with_ephemeral(true));
1222 assert_eq!(fifo.usage(), 1);
1223 let e2b = fifo.get(&2).expect("entry 2 should exist");
1224 drop(e2a);
1225 assert_eq!(fifo.usage(), 1);
1226 drop(e2b);
1227 assert_eq!(fifo.usage(), 1);
1228 }
1229
1230 #[test]
1231 fn test_evict_all() {
1232 let pipe = Box::new(PiecePipe::default());
1233
1234 let fifo = fifo_cache_for_test();
1235 fifo.set_pipe(pipe.clone());
1236 for i in 0..fifo.capacity() as _ {
1237 fifo.insert(i, i);
1238 }
1239 assert_eq!(fifo.usage(), fifo.capacity());
1240
1241 fifo.evict_all();
1242 let mut pieces = pipe
1243 .pieces()
1244 .iter()
1245 .map(|p| (p.hash(), *p.key(), *p.value()))
1246 .collect_vec();
1247 pieces.sort_by_key(|t| t.0);
1248 let expected = (0..fifo.capacity() as u64).map(|i| (i, i, i)).collect_vec();
1249 assert_eq!(pieces, expected);
1250 }
1251
1252 #[test]
1253 fn test_insert_size_over_capacity() {
1254 let cache: RawCache<Fifo<Vec<u8>, Vec<u8>, TestProperties>, ModHasher> = RawCache::new(RawCacheConfig {
1255 capacity: 4 * 1024, shards: 1,
1257 eviction_config: FifoConfig::default(),
1258 hash_builder: Default::default(),
1259 weighter: Arc::new(|k, v| k.len() + v.len()),
1260 event_listener: None,
1261 metrics: Arc::new(Metrics::noop()),
1262 });
1263
1264 let key = vec![b'k'; 1024]; let value = vec![b'v'; 5 * 1024]; cache.insert(key.clone(), value.clone());
1268 assert_eq!(cache.usage(), 6 * 1024);
1269 assert_eq!(cache.get(&key).unwrap().value(), &value);
1270 }
1271
1272 fn test_resize<E>(cache: &RawCache<E, ModHasher, HashTableIndexer<E>>)
1273 where
1274 E: Eviction<Key = u64, Value = u64>,
1275 {
1276 let capacity = cache.capacity();
1277 for i in 0..capacity as u64 * 2 {
1278 cache.insert(i, i);
1279 }
1280 assert_eq!(cache.usage(), capacity);
1281 cache.resize(capacity / 2).unwrap();
1282 assert_eq!(cache.usage(), capacity / 2);
1283 for i in 0..capacity as u64 * 2 {
1284 cache.insert(i, i);
1285 }
1286 assert_eq!(cache.usage(), capacity / 2);
1287 }
1288
1289 #[test]
1290 fn test_fifo_cache_resize() {
1291 let cache = fifo_cache_for_test();
1292 test_resize(&cache);
1293 }
1294
1295 #[test]
1296 fn test_s3fifo_cache_resize() {
1297 let cache = s3fifo_cache_for_test();
1298 test_resize(&cache);
1299 }
1300
1301 #[test]
1302 fn test_lru_cache_resize() {
1303 let cache = lru_cache_for_test();
1304 test_resize(&cache);
1305 }
1306
1307 #[test]
1308 fn test_lfu_cache_resize() {
1309 let cache = lfu_cache_for_test();
1310 test_resize(&cache);
1311 }
1312
1313 mod fuzzy {
1314 use foyer_common::properties::Hint;
1315
1316 use super::*;
1317
1318 fn fuzzy<E, S>(cache: RawCache<E, S>, hints: Vec<Hint>)
1319 where
1320 E: Eviction<Key = u64, Value = u64, Properties = TestProperties>,
1321 S: HashBuilder,
1322 {
1323 let handles = (0..8)
1324 .map(|i| {
1325 let c = cache.clone();
1326 let hints = hints.clone();
1327 std::thread::spawn(move || {
1328 let mut rng = SmallRng::seed_from_u64(i);
1329 for _ in 0..100000 {
1330 let key = rng.next_u64();
1331 if let Some(entry) = c.get(&key) {
1332 assert_eq!(key, *entry);
1333 drop(entry);
1334 continue;
1335 }
1336 let hint = hints.choose(&mut rng).cloned().unwrap();
1337 c.insert_with_properties(key, key, TestProperties::default().with_hint(hint));
1338 }
1339 })
1340 })
1341 .collect_vec();
1342
1343 handles.into_iter().for_each(|handle| handle.join().unwrap());
1344
1345 assert_eq!(cache.usage(), cache.capacity());
1346 }
1347
1348 #[test_log::test]
1349 fn test_fifo_cache_fuzzy() {
1350 let cache: RawCache<Fifo<u64, u64, TestProperties>, ModHasher> = RawCache::new(RawCacheConfig {
1351 capacity: 256,
1352 shards: 4,
1353 eviction_config: FifoConfig::default(),
1354 hash_builder: Default::default(),
1355 weighter: Arc::new(|_, _| 1),
1356 event_listener: None,
1357 metrics: Arc::new(Metrics::noop()),
1358 });
1359 let hints = vec![Hint::Normal];
1360 fuzzy(cache, hints);
1361 }
1362
1363 #[test_log::test]
1364 fn test_s3fifo_cache_fuzzy() {
1365 let cache: RawCache<S3Fifo<u64, u64, TestProperties>, ModHasher> = RawCache::new(RawCacheConfig {
1366 capacity: 256,
1367 shards: 4,
1368 eviction_config: S3FifoConfig::default(),
1369 hash_builder: Default::default(),
1370 weighter: Arc::new(|_, _| 1),
1371 event_listener: None,
1372 metrics: Arc::new(Metrics::noop()),
1373 });
1374 let hints = vec![Hint::Normal];
1375 fuzzy(cache, hints);
1376 }
1377
1378 #[test_log::test]
1379 fn test_lru_cache_fuzzy() {
1380 let cache: RawCache<Lru<u64, u64, TestProperties>, ModHasher> = RawCache::new(RawCacheConfig {
1381 capacity: 256,
1382 shards: 4,
1383 eviction_config: LruConfig::default(),
1384 hash_builder: Default::default(),
1385 weighter: Arc::new(|_, _| 1),
1386 event_listener: None,
1387 metrics: Arc::new(Metrics::noop()),
1388 });
1389 let hints = vec![Hint::Normal, Hint::Low];
1390 fuzzy(cache, hints);
1391 }
1392
1393 #[test_log::test]
1394 fn test_lfu_cache_fuzzy() {
1395 let cache: RawCache<Lfu<u64, u64, TestProperties>, ModHasher> = RawCache::new(RawCacheConfig {
1396 capacity: 256,
1397 shards: 4,
1398 eviction_config: LfuConfig::default(),
1399 hash_builder: Default::default(),
1400 weighter: Arc::new(|_, _| 1),
1401 event_listener: None,
1402 metrics: Arc::new(Metrics::noop()),
1403 });
1404 let hints = vec![Hint::Normal];
1405 fuzzy(cache, hints);
1406 }
1407 }
1408}