use std::{
any::Any,
fmt::Debug,
future::Future,
hash::Hash,
ops::Deref,
pin::Pin,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
task::{Context, Poll},
};
use equivalent::Equivalent;
use foyer_common::{
code::HashBuilder,
error::{Error, ErrorKind, Result},
event::{Event, EventListener},
metrics::Metrics,
properties::{Location, Properties, Source},
spawn::Spawner,
strict_assert,
utils::scope::Scope,
};
use futures_util::FutureExt as _;
use itertools::Itertools;
use parking_lot::{Mutex, RwLock};
use pin_project::{pin_project, pinned_drop};
use crate::{
eviction::{Eviction, Op},
indexer::{sentry::Sentry, Indexer},
inflight::{
Enqueue, FetchOrTake, FetchTarget, InflightManager, Notifier, OptionalFetch, OptionalFetchBuilder,
RequiredFetch, RequiredFetchBuilder, Waiter,
},
pipe::{ArcPipe, NoopPipe},
record::{Data, Record},
Piece,
};
pub trait Weighter<K, V>: Fn(&K, &V) -> usize + Send + Sync + 'static {}
impl<K, V, T> Weighter<K, V> for T where T: Fn(&K, &V) -> usize + Send + Sync + 'static {}
pub trait Filter<K, V>: Fn(&K, &V) -> bool + Send + Sync + 'static {}
impl<K, V, T> Filter<K, V> for T where T: Fn(&K, &V) -> bool + Send + Sync + 'static {}
pub struct RawCacheConfig<E, S>
where
E: Eviction,
S: HashBuilder,
{
pub capacity: usize,
pub shards: usize,
pub eviction_config: E::Config,
pub hash_builder: S,
pub weighter: Arc<dyn Weighter<E::Key, E::Value>>,
pub filter: Arc<dyn Filter<E::Key, E::Value>>,
pub event_listener: Option<Arc<dyn EventListener<Key = E::Key, Value = E::Value>>>,
pub metrics: Arc<Metrics>,
}
struct RawCacheShard<E, S, I>
where
E: Eviction,
S: HashBuilder,
I: Indexer<Eviction = E>,
{
eviction: E,
indexer: Sentry<I>,
usage: usize,
entries: usize,
capacity: usize,
inflights: Arc<Mutex<InflightManager<E, S, I>>>,
metrics: Arc<Metrics>,
_event_listener: Option<Arc<dyn EventListener<Key = E::Key, Value = E::Value>>>,
}
impl<E, S, I> RawCacheShard<E, S, I>
where
E: Eviction,
S: HashBuilder,
I: Indexer<Eviction = E>,
{
fn evict(&mut self, target: usize, garbages: &mut Vec<(Event, Arc<Record<E>>)>) {
while self.usage > target {
let evicted = match self.eviction.pop() {
Some(evicted) => evicted,
None => break,
};
self.metrics.memory_evict.increase(1);
let e = self.indexer.remove(evicted.hash(), evicted.key()).unwrap();
assert_eq!(Arc::as_ptr(&evicted), Arc::as_ptr(&e));
strict_assert!(!evicted.as_ref().is_in_indexer());
strict_assert!(!evicted.as_ref().is_in_eviction());
self.usage -= evicted.weight();
self.entries -= 1;
self.metrics.memory_entries.decrease(1);
garbages.push((Event::Evict, evicted));
}
}
#[expect(clippy::type_complexity)]
fn emplace(
&mut self,
record: Arc<Record<E>>,
garbages: &mut Vec<(Event, Arc<Record<E>>)>,
notifiers: &mut Vec<Notifier<Option<RawCacheEntry<E, S, I>>>>,
) {
*notifiers = self
.inflights
.lock()
.take(record.hash(), record.key(), None)
.unwrap_or_default();
if record.properties().phantom().unwrap_or_default() {
if let Some(old) = self.indexer.remove(record.hash(), record.key()) {
strict_assert!(!old.is_in_indexer());
if old.is_in_eviction() {
self.eviction.remove(&old);
}
strict_assert!(!old.is_in_eviction());
self.usage -= old.weight();
self.entries -= 1;
self.metrics.memory_entries.decrease(1);
garbages.push((Event::Replace, old));
}
record.inc_refs(notifiers.len() + 1);
garbages.push((Event::Remove, record));
self.metrics.memory_insert.increase(1);
return;
}
let weight = record.weight();
let old_usage = self.usage;
self.evict(self.capacity.saturating_sub(weight), garbages);
if let Some(old) = self.indexer.insert(record.clone()) {
self.metrics.memory_replace.increase(1);
strict_assert!(!old.is_in_indexer());
if old.is_in_eviction() {
self.eviction.remove(&old);
}
strict_assert!(!old.is_in_eviction());
self.usage -= old.weight();
garbages.push((Event::Replace, old));
} else {
self.metrics.memory_insert.increase(1);
self.entries += 1;
self.metrics.memory_entries.increase(1);
}
strict_assert!(record.is_in_indexer());
strict_assert!(!record.is_in_eviction());
self.eviction.push(record.clone());
strict_assert!(record.is_in_eviction());
self.usage += weight;
record.inc_refs(notifiers.len() + 1);
match self.usage.cmp(&old_usage) {
std::cmp::Ordering::Greater => self.metrics.memory_usage.increase((self.usage - old_usage) as _),
std::cmp::Ordering::Less => self.metrics.memory_usage.decrease((old_usage - self.usage) as _),
std::cmp::Ordering::Equal => {}
}
}
#[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::shard::remove"))]
fn remove<Q>(&mut self, hash: u64, key: &Q) -> Option<Arc<Record<E>>>
where
Q: Hash + Equivalent<E::Key> + ?Sized,
{
let record = self.indexer.remove(hash, key)?;
if record.is_in_eviction() {
self.eviction.remove(&record);
}
strict_assert!(!record.is_in_indexer());
strict_assert!(!record.is_in_eviction());
self.usage -= record.weight();
self.entries -= 1;
self.metrics.memory_remove.increase(1);
self.metrics.memory_usage.decrease(record.weight() as _);
self.metrics.memory_entries.decrease(1);
record.inc_refs(1);
Some(record)
}
#[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::shard::get_noop"))]
fn get_noop<Q>(&self, hash: u64, key: &Q) -> Option<Arc<Record<E>>>
where
Q: Hash + Equivalent<E::Key> + ?Sized,
{
self.get_inner(hash, key)
}
#[cfg_attr(
feature = "tracing",
fastrace::trace(name = "foyer::memory::raw::shard::get_immutable")
)]
fn get_immutable<Q>(&self, hash: u64, key: &Q) -> Option<Arc<Record<E>>>
where
Q: Hash + Equivalent<E::Key> + ?Sized,
{
self.get_inner(hash, key)
.inspect(|record| self.acquire_immutable(record))
}
#[cfg_attr(
feature = "tracing",
fastrace::trace(name = "foyer::memory::raw::shard::get_mutable")
)]
fn get_mutable<Q>(&mut self, hash: u64, key: &Q) -> Option<Arc<Record<E>>>
where
Q: Hash + Equivalent<E::Key> + ?Sized,
{
self.get_inner(hash, key).inspect(|record| self.acquire_mutable(record))
}
#[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::shard::get_inner"))]
fn get_inner<Q>(&self, hash: u64, key: &Q) -> Option<Arc<Record<E>>>
where
Q: Hash + Equivalent<E::Key> + ?Sized,
{
let record = match self.indexer.get(hash, key).cloned() {
Some(record) => {
self.metrics.memory_hit.increase(1);
record
}
None => {
self.metrics.memory_miss.increase(1);
return None;
}
};
strict_assert!(record.is_in_indexer());
record.inc_refs(1);
Some(record)
}
#[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::shard::clear"))]
fn clear(&mut self, garbages: &mut Vec<Arc<Record<E>>>) {
let records = self.indexer.drain().collect_vec();
self.eviction.clear();
let mut count = 0;
for record in records {
count += 1;
strict_assert!(!record.is_in_indexer());
strict_assert!(!record.is_in_eviction());
garbages.push(record);
}
self.entries = 0;
if count > 0 {
self.metrics.memory_entries.decrease(count);
self.metrics.memory_remove.increase(count);
}
}
#[cfg_attr(
feature = "tracing",
fastrace::trace(name = "foyer::memory::raw::shard::acquire_immutable")
)]
fn acquire_immutable(&self, record: &Arc<Record<E>>) {
match E::acquire() {
Op::Immutable(f) => f(&self.eviction, record),
_ => unreachable!(),
}
}
#[cfg_attr(
feature = "tracing",
fastrace::trace(name = "foyer::memory::raw::shard::acquire_mutable")
)]
fn acquire_mutable(&mut self, record: &Arc<Record<E>>) {
match E::acquire() {
Op::Mutable(mut f) => f(&mut self.eviction, record),
_ => unreachable!(),
}
}
#[cfg_attr(
feature = "tracing",
fastrace::trace(name = "foyer::memory::raw::shard::release_immutable")
)]
fn release_immutable(&self, record: &Arc<Record<E>>) {
match E::release() {
Op::Immutable(f) => f(&self.eviction, record),
_ => unreachable!(),
}
}
#[cfg_attr(
feature = "tracing",
fastrace::trace(name = "foyer::memory::raw::shard::release_mutable")
)]
fn release_mutable(&mut self, record: &Arc<Record<E>>) {
match E::release() {
Op::Mutable(mut f) => f(&mut self.eviction, record),
_ => unreachable!(),
}
}
}
struct RawCacheInner<E, S, I>
where
E: Eviction,
S: HashBuilder,
I: Indexer<Eviction = E>,
{
shards: Vec<RwLock<RawCacheShard<E, S, I>>>,
capacity: usize,
hash_builder: Arc<S>,
weighter: Arc<dyn Weighter<E::Key, E::Value>>,
filter: Arc<dyn Filter<E::Key, E::Value>>,
metrics: Arc<Metrics>,
event_listener: Option<Arc<dyn EventListener<Key = E::Key, Value = E::Value>>>,
}
impl<E, S, I> RawCacheInner<E, S, I>
where
E: Eviction,
S: HashBuilder,
I: Indexer<Eviction = E>,
{
#[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::inner::clear"))]
fn clear(&self) {
let mut garbages = vec![];
self.shards
.iter()
.map(|shard| shard.write())
.for_each(|mut shard| shard.clear(&mut garbages));
if let Some(listener) = self.event_listener.as_ref() {
for record in garbages {
listener.on_leave(Event::Clear, record.key(), record.value());
}
}
}
}
pub struct RawCache<E, S, I>
where
E: Eviction,
S: HashBuilder,
I: Indexer<Eviction = E>,
{
pipe: ArcPipe<E::Key, E::Value, E::Properties>,
inner: Arc<RawCacheInner<E, S, I>>,
}
impl<E, S, I> Clone for RawCache<E, S, I>
where
E: Eviction,
S: HashBuilder,
I: Indexer<Eviction = E>,
{
fn clone(&self) -> Self {
Self {
pipe: self.pipe.clone(),
inner: self.inner.clone(),
}
}
}
impl<E, S, I> Drop for RawCacheInner<E, S, I>
where
E: Eviction,
S: HashBuilder,
I: Indexer<Eviction = E>,
{
fn drop(&mut self) {
self.clear();
}
}
impl<E, S, I> RawCache<E, S, I>
where
E: Eviction,
S: HashBuilder,
I: Indexer<Eviction = E>,
{
pub fn new(config: RawCacheConfig<E, S>) -> Self {
assert!(config.shards > 0, "shards must be greater than zero.");
let shard_capacities = (0..config.shards)
.map(|index| Self::shard_capacity_for(config.capacity, config.shards, index))
.collect_vec();
let shards = shard_capacities
.into_iter()
.map(|shard_capacity| RawCacheShard {
eviction: E::new(shard_capacity, &config.eviction_config),
indexer: Sentry::default(),
usage: 0,
entries: 0,
capacity: shard_capacity,
inflights: Arc::new(Mutex::new(InflightManager::new())),
metrics: config.metrics.clone(),
_event_listener: config.event_listener.clone(),
})
.map(RwLock::new)
.collect_vec();
Self {
pipe: Arc::new(NoopPipe::default()),
inner: Arc::new(RawCacheInner {
shards,
capacity: config.capacity,
hash_builder: Arc::new(config.hash_builder),
weighter: config.weighter,
filter: config.filter,
metrics: config.metrics,
event_listener: config.event_listener,
}),
}
}
#[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::resize"))]
pub fn resize(&self, capacity: usize) -> Result<()> {
let shards = self.inner.shards.len();
assert!(shards > 0, "shards must be greater than zero.");
let shard_capacities = (0..shards)
.map(|index| Self::shard_capacity_for(capacity, shards, index))
.collect_vec();
let handles = shard_capacities
.into_iter()
.enumerate()
.map(|(i, shard_capacity)| {
let pipe = self.pipe.clone();
let inner = self.inner.clone();
std::thread::spawn(move || {
let mut garbages = vec![];
let res = inner.shards[i].write().with(|mut shard| {
shard.eviction.update(shard_capacity, None).inspect(|_| {
shard.capacity = shard_capacity;
shard.evict(shard_capacity, &mut garbages)
})
});
let piped = pipe.is_enabled();
if inner.event_listener.is_some() || piped {
for (event, record) in garbages {
if let Some(listener) = inner.event_listener.as_ref() {
listener.on_leave(event, record.key(), record.value())
}
if piped && event == Event::Evict {
pipe.send(Piece::new(record));
}
}
}
res
})
})
.collect_vec();
let errs = handles
.into_iter()
.map(|handle| handle.join().unwrap())
.filter(|res| res.is_err())
.map(|res| res.unwrap_err())
.collect_vec();
if !errs.is_empty() {
let mut e = Error::new(ErrorKind::Config, "resize raw cache failed");
for err in errs {
e = e.with_context("reason", format!("{err}"));
}
return Err(e);
}
Ok(())
}
#[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::insert"))]
pub fn insert(&self, key: E::Key, value: E::Value) -> RawCacheEntry<E, S, I> {
self.insert_with_properties(key, value, Default::default())
}
#[cfg_attr(
feature = "tracing",
fastrace::trace(name = "foyer::memory::raw::insert_with_properties")
)]
pub fn insert_with_properties(
&self,
key: E::Key,
value: E::Value,
properties: E::Properties,
) -> RawCacheEntry<E, S, I> {
self.insert_with_properties_inner(key, value, properties, Source::Outer)
}
fn insert_with_properties_inner(
&self,
key: E::Key,
value: E::Value,
mut properties: E::Properties,
source: Source,
) -> RawCacheEntry<E, S, I> {
let hash = self.inner.hash_builder.hash_one(&key);
let weight = (self.inner.weighter)(&key, &value);
if !(self.inner.filter)(&key, &value) {
properties = properties.with_phantom(true);
}
if let Some(location) = properties.location() {
if location == Location::OnDisk {
properties = properties.with_phantom(true);
}
}
let record = Arc::new(Record::new(Data {
key,
value,
properties,
hash,
weight,
}));
self.insert_inner(record, source)
}
#[doc(hidden)]
#[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::insert_piece"))]
pub fn insert_piece(&self, piece: Piece<E::Key, E::Value, E::Properties>) -> RawCacheEntry<E, S, I> {
self.insert_inner(piece.into_record(), Source::Memory)
}
#[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::insert_inner"))]
fn insert_inner(&self, record: Arc<Record<E>>, source: Source) -> RawCacheEntry<E, S, I> {
let mut garbages = vec![];
let mut notifiers = vec![];
self.inner.shards[self.shard(record.hash())]
.write()
.with(|mut shard| shard.emplace(record.clone(), &mut garbages, &mut notifiers));
for notifier in notifiers {
let _ = notifier.send(Ok(Some(RawCacheEntry {
pipe: self.pipe.clone(),
record: record.clone(),
inner: self.inner.clone(),
source,
})));
}
let piped = self.pipe.is_enabled();
if self.inner.event_listener.is_some() || piped {
for (event, record) in garbages {
if let Some(listener) = self.inner.event_listener.as_ref() {
listener.on_leave(event, record.key(), record.value())
}
if piped && event == Event::Evict {
self.pipe.send(Piece::new(record));
}
}
}
RawCacheEntry {
record,
pipe: self.pipe.clone(),
inner: self.inner.clone(),
source,
}
}
#[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::evict_all"))]
pub fn evict_all(&self) {
let mut garbages = vec![];
for shard in self.inner.shards.iter() {
shard.write().evict(0, &mut garbages);
}
let piped = self.pipe.is_enabled();
if self.inner.event_listener.is_some() || piped {
for (event, record) in garbages {
if let Some(listener) = self.inner.event_listener.as_ref() {
listener.on_leave(event, record.key(), record.value())
}
if piped && event == Event::Evict {
self.pipe.send(Piece::new(record));
}
}
}
}
#[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::flush"))]
pub async fn flush(&self) {
let mut garbages = vec![];
for shard in self.inner.shards.iter() {
shard.write().evict(0, &mut garbages);
}
let piped = self.pipe.is_enabled();
if let Some(listener) = self.inner.event_listener.as_ref() {
for (event, record) in garbages.iter() {
listener.on_leave(*event, record.key(), record.value());
}
}
if piped {
let pieces = garbages.into_iter().map(|(_, record)| Piece::new(record)).collect_vec();
self.pipe.flush(pieces).await;
}
}
#[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::remove"))]
pub fn remove<Q>(&self, key: &Q) -> Option<RawCacheEntry<E, S, I>>
where
Q: Hash + Equivalent<E::Key> + ?Sized,
{
let hash = self.inner.hash_builder.hash_one(key);
self.inner.shards[self.shard(hash)]
.write()
.with(|mut shard| {
shard.remove(hash, key).map(|record| RawCacheEntry {
pipe: self.pipe.clone(),
inner: self.inner.clone(),
record,
source: Source::Memory,
})
})
.inspect(|record| {
if let Some(listener) = self.inner.event_listener.as_ref() {
listener.on_leave(Event::Remove, record.key(), record.value());
}
})
}
#[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::get"))]
pub fn get<Q>(&self, key: &Q) -> Option<RawCacheEntry<E, S, I>>
where
Q: Hash + Equivalent<E::Key> + ?Sized,
{
let hash = self.inner.hash_builder.hash_one(key);
let record = match E::acquire() {
Op::Noop => self.inner.shards[self.shard(hash)].read().get_noop(hash, key),
Op::Immutable(_) => self.inner.shards[self.shard(hash)]
.read()
.with(|shard| shard.get_immutable(hash, key)),
Op::Mutable(_) => self.inner.shards[self.shard(hash)]
.write()
.with(|mut shard| shard.get_mutable(hash, key)),
}?;
Some(RawCacheEntry {
pipe: self.pipe.clone(),
inner: self.inner.clone(),
record,
source: Source::Memory,
})
}
#[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::contains"))]
pub fn contains<Q>(&self, key: &Q) -> bool
where
Q: Hash + Equivalent<E::Key> + ?Sized,
{
let hash = self.inner.hash_builder.hash_one(key);
self.inner.shards[self.shard(hash)]
.read()
.with(|shard| shard.indexer.get(hash, key).is_some())
}
#[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::touch"))]
pub fn touch<Q>(&self, key: &Q) -> bool
where
Q: Hash + Equivalent<E::Key> + ?Sized,
{
let hash = self.inner.hash_builder.hash_one(key);
match E::acquire() {
Op::Noop => self.inner.shards[self.shard(hash)].read().get_noop(hash, key),
Op::Immutable(_) => self.inner.shards[self.shard(hash)]
.read()
.with(|shard| shard.get_immutable(hash, key)),
Op::Mutable(_) => self.inner.shards[self.shard(hash)]
.write()
.with(|mut shard| shard.get_mutable(hash, key)),
}
.is_some()
}
#[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::clear"))]
pub fn clear(&self) {
self.inner.clear();
}
pub fn capacity(&self) -> usize {
self.inner.capacity
}
pub fn usage(&self) -> usize {
self.inner.shards.iter().map(|shard| shard.read().usage).sum()
}
pub fn entries(&self) -> usize {
self.inner.shards.iter().map(|shard| shard.read().entries).sum()
}
pub fn metrics(&self) -> &Metrics {
&self.inner.metrics
}
pub fn hash_builder(&self) -> &Arc<S> {
&self.inner.hash_builder
}
pub fn shards(&self) -> usize {
self.inner.shards.len()
}
pub(crate) fn with_pipe(mut self, pipe: ArcPipe<E::Key, E::Value, E::Properties>) -> Self {
self.pipe = pipe;
self
}
fn shard(&self, hash: u64) -> usize {
hash as usize % self.inner.shards.len()
}
fn shard_capacity_for(total: usize, shards: usize, index: usize) -> usize {
let base = total / shards;
let remainder = total % shards;
base + usize::from(index < remainder)
}
}
pub struct RawCacheEntry<E, S, I>
where
E: Eviction,
S: HashBuilder,
I: Indexer<Eviction = E>,
{
pipe: ArcPipe<E::Key, E::Value, E::Properties>,
inner: Arc<RawCacheInner<E, S, I>>,
record: Arc<Record<E>>,
source: Source,
}
impl<E, S, I> Debug for RawCacheEntry<E, S, I>
where
E: Eviction,
S: HashBuilder,
I: Indexer<Eviction = E>,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RawCacheEntry").field("record", &self.record).finish()
}
}
impl<E, S, I> Drop for RawCacheEntry<E, S, I>
where
E: Eviction,
S: HashBuilder,
I: Indexer<Eviction = E>,
{
fn drop(&mut self) {
let hash = self.record.hash();
let shard = &self.inner.shards[hash as usize % self.inner.shards.len()];
if self.record.dec_refs(1) == 0 {
if self.record.properties().phantom().unwrap_or_default() {
if let Some(listener) = self.inner.event_listener.as_ref() {
listener.on_leave(Event::Evict, self.record.key(), self.record.value());
}
if self.pipe.is_enabled() {
self.pipe.send(Piece::new(self.record.clone()));
}
return;
}
match E::release() {
Op::Noop => {}
Op::Immutable(_) => shard.read().with(|shard| shard.release_immutable(&self.record)),
Op::Mutable(_) => shard.write().with(|mut shard| shard.release_mutable(&self.record)),
}
}
}
}
impl<E, S, I> Clone for RawCacheEntry<E, S, I>
where
E: Eviction,
S: HashBuilder,
I: Indexer<Eviction = E>,
{
fn clone(&self) -> Self {
self.record.inc_refs(1);
Self {
pipe: self.pipe.clone(),
inner: self.inner.clone(),
record: self.record.clone(),
source: self.source,
}
}
}
impl<E, S, I> Deref for RawCacheEntry<E, S, I>
where
E: Eviction,
S: HashBuilder,
I: Indexer<Eviction = E>,
{
type Target = E::Value;
fn deref(&self) -> &Self::Target {
self.value()
}
}
unsafe impl<E, S, I> Send for RawCacheEntry<E, S, I>
where
E: Eviction,
S: HashBuilder,
I: Indexer<Eviction = E>,
{
}
unsafe impl<E, S, I> Sync for RawCacheEntry<E, S, I>
where
E: Eviction,
S: HashBuilder,
I: Indexer<Eviction = E>,
{
}
impl<E, S, I> RawCacheEntry<E, S, I>
where
E: Eviction,
S: HashBuilder,
I: Indexer<Eviction = E>,
{
pub fn hash(&self) -> u64 {
self.record.hash()
}
pub fn key(&self) -> &E::Key {
self.record.key()
}
pub fn value(&self) -> &E::Value {
self.record.value()
}
pub fn properties(&self) -> &E::Properties {
self.record.properties()
}
pub fn weight(&self) -> usize {
self.record.weight()
}
pub fn refs(&self) -> usize {
self.record.refs()
}
pub fn is_outdated(&self) -> bool {
!self.record.is_in_indexer()
}
pub fn piece(&self) -> Piece<E::Key, E::Value, E::Properties> {
Piece::new(self.record.clone())
}
pub fn source(&self) -> Source {
self.source
}
}
impl<E, S, I> RawCache<E, S, I>
where
E: Eviction,
S: HashBuilder,
I: Indexer<Eviction = E>,
{
#[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::get_or_fetch"))]
pub fn get_or_fetch<Q, F, FU, IT, ER>(&self, key: &Q, fetch: F) -> RawGetOrFetch<E, S, I>
where
Q: Hash + Equivalent<E::Key> + ?Sized + ToOwned<Owned = E::Key>,
F: FnOnce() -> FU,
FU: Future<Output = std::result::Result<IT, ER>> + Send + 'static,
IT: Into<FetchTarget<E::Key, E::Value, E::Properties>>,
ER: Into<anyhow::Error>,
{
let fut = fetch();
self.get_or_fetch_inner(
key,
|| None,
|| {
Some(Box::new(|_| {
async {
match fut.await {
Ok(it) => Ok(it.into()),
Err(e) => Err(Error::new(ErrorKind::External, "fetch failed").with_source(e)),
}
}
.boxed()
}))
},
(),
&Spawner::current(),
)
}
#[doc(hidden)]
#[cfg_attr(
feature = "tracing",
fastrace::trace(name = "foyer::memory::raw::get_or_fetch_inner")
)]
pub fn get_or_fetch_inner<Q, C, FO, FR>(
&self,
key: &Q,
fo: FO,
fr: FR,
ctx: C,
spawner: &Spawner,
) -> RawGetOrFetch<E, S, I>
where
Q: Hash + Equivalent<E::Key> + ?Sized + ToOwned<Owned = E::Key>,
C: Any + Send + Sync + 'static,
FO: FnOnce() -> Option<OptionalFetchBuilder<E::Key, E::Value, E::Properties, C>>,
FR: FnOnce() -> Option<RequiredFetchBuilder<E::Key, E::Value, E::Properties, C>>,
{
let hash = self.inner.hash_builder.hash_one(key);
let extract = |key: &Q, opt: Option<Arc<Record<E>>>, inflights: &Arc<Mutex<InflightManager<E, S, I>>>| {
opt.map(|record| {
RawGetOrFetch::Hit(Some(RawCacheEntry {
pipe: self.pipe.clone(),
inner: self.inner.clone(),
record,
source: Source::Memory,
}))
})
.unwrap_or_else(|| match inflights.lock().enqueue(hash, key, fr()) {
Enqueue::Lead {
id,
close,
waiter,
required_fetch_builder,
} => {
let fetch = RawFetch {
state: RawFetchState::Init {
optional_fetch_builder: fo(),
required_fetch_builder,
},
id,
hash,
key: Some(key.to_owned()),
ctx,
cache: self.clone(),
inflights: inflights.clone(),
close,
};
spawner.spawn(fetch);
RawGetOrFetch::Miss(RawWait { waiter })
}
Enqueue::Wait(waiter) => RawGetOrFetch::Miss(RawWait { waiter }),
})
};
let res = match E::acquire() {
Op::Noop => self.inner.shards[self.shard(hash)]
.read()
.with(|shard| extract(key, shard.get_noop(hash, key), &shard.inflights)),
Op::Immutable(_) => self.inner.shards[self.shard(hash)]
.read()
.with(|shard| extract(key, shard.get_immutable(hash, key), &shard.inflights)),
Op::Mutable(_) => self.inner.shards[self.shard(hash)]
.write()
.with(|mut shard| extract(key, shard.get_mutable(hash, key), &shard.inflights)),
};
res
}
}
#[must_use]
#[pin_project(project = RawGetOrFetchProj)]
pub enum RawGetOrFetch<E, S, I>
where
E: Eviction,
S: HashBuilder,
I: Indexer<Eviction = E>,
{
Hit(Option<RawCacheEntry<E, S, I>>),
Miss(#[pin] RawWait<E, S, I>),
}
impl<E, S, I> Debug for RawGetOrFetch<E, S, I>
where
E: Eviction,
S: HashBuilder,
I: Indexer<Eviction = E>,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Hit(e) => f.debug_tuple("Hit").field(e).finish(),
Self::Miss(fut) => f.debug_tuple("Miss").field(fut).finish(),
}
}
}
impl<E, S, I> Future for RawGetOrFetch<E, S, I>
where
E: Eviction,
S: HashBuilder,
I: Indexer<Eviction = E>,
{
type Output = Result<Option<RawCacheEntry<E, S, I>>>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
match this {
RawGetOrFetchProj::Hit(opt) => {
assert!(opt.is_some(), "entry is already taken");
Poll::Ready(Ok(opt.take()))
}
RawGetOrFetchProj::Miss(fut) => fut.poll(cx),
}
}
}
impl<E, S, I> RawGetOrFetch<E, S, I>
where
E: Eviction,
S: HashBuilder,
I: Indexer<Eviction = E>,
{
pub fn need_await(&self) -> bool {
matches!(self, Self::Miss(_))
}
#[expect(clippy::allow_attributes)]
#[allow(clippy::result_large_err)]
pub fn try_unwrap(self) -> std::result::Result<Option<RawCacheEntry<E, S, I>>, Self> {
match self {
Self::Hit(opt) => {
assert!(opt.is_some(), "entry is already taken");
Ok(opt)
}
Self::Miss(_) => Err(self),
}
}
}
type Once<T> = Option<T>;
#[must_use]
enum Try<E, S, I, C>
where
E: Eviction,
S: HashBuilder,
I: Indexer<Eviction = E>,
C: Any + Send + 'static,
{
Noop,
SetStateAndContinue(RawFetchState<E, S, I, C>),
Ready,
}
macro_rules! handle_try {
($state:expr, $method:ident($($args:expr),* $(,)?)) => {
handle_try! { $state, Self::$method($($args),*) }
};
($state:expr, $try:expr) => {
match $try {
Try::Noop => {}
Try::SetStateAndContinue(state) => {
$state = state;
continue;
},
Try::Ready => {
$state = RawFetchState::Ready;
return Poll::Ready(())
},
}
};
}
#[expect(clippy::type_complexity)]
pub enum RawFetchState<E, S, I, C>
where
E: Eviction,
S: HashBuilder,
I: Indexer<Eviction = E>,
{
Init {
optional_fetch_builder: Option<OptionalFetchBuilder<E::Key, E::Value, E::Properties, C>>,
required_fetch_builder: Option<RequiredFetchBuilder<E::Key, E::Value, E::Properties, C>>,
},
FetchOptional {
optional_fetch: OptionalFetch<FetchTarget<E::Key, E::Value, E::Properties>>,
required_fetch_builder: Option<RequiredFetchBuilder<E::Key, E::Value, E::Properties, C>>,
},
FetchRequired {
required_fetch: RequiredFetch<FetchTarget<E::Key, E::Value, E::Properties>>,
},
Notify {
res: Option<Result<Option<RawCacheEntry<E, S, I>>>>,
notifiers: Vec<Notifier<Option<RawCacheEntry<E, S, I>>>>,
},
Ready,
}
impl<E, S, I, C> Debug for RawFetchState<E, S, I, C>
where
E: Eviction,
S: HashBuilder,
I: Indexer<Eviction = E>,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Init { .. } => f.debug_struct("Init").finish(),
Self::FetchOptional { .. } => f.debug_struct("Optional").finish(),
Self::FetchRequired { .. } => f.debug_struct("Required").finish(),
Self::Notify { res, .. } => f.debug_struct("Notify").field("res", res).finish(),
Self::Ready => f.debug_struct("Ready").finish(),
}
}
}
#[pin_project]
pub struct RawWait<E, S, I>
where
E: Eviction,
S: HashBuilder,
I: Indexer<Eviction = E>,
{
waiter: Waiter<Option<RawCacheEntry<E, S, I>>>,
}
impl<E, S, I> Debug for RawWait<E, S, I>
where
E: Eviction,
S: HashBuilder,
I: Indexer<Eviction = E>,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RawWait").field("waiter", &self.waiter).finish()
}
}
impl<E, S, I> Future for RawWait<E, S, I>
where
E: Eviction,
S: HashBuilder,
I: Indexer<Eviction = E>,
{
type Output = Result<Option<RawCacheEntry<E, S, I>>>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
this.waiter.poll_unpin(cx).map(|r| match r {
Ok(r) => r,
Err(e) => Err(Error::new(ErrorKind::ChannelClosed, "waiter channel closed").with_source(e)),
})
}
}
#[pin_project(PinnedDrop)]
pub struct RawFetch<E, S, I, C>
where
E: Eviction,
S: HashBuilder,
I: Indexer<Eviction = E>,
{
state: RawFetchState<E, S, I, C>,
id: usize,
hash: u64,
key: Once<E::Key>,
ctx: C,
cache: RawCache<E, S, I>,
inflights: Arc<Mutex<InflightManager<E, S, I>>>,
close: Arc<AtomicBool>,
}
impl<E, S, I, C> Debug for RawFetch<E, S, I, C>
where
E: Eviction,
S: HashBuilder,
I: Indexer<Eviction = E>,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RawFetch")
.field("state", &self.state)
.field("id", &self.id)
.field("hash", &self.hash)
.finish()
}
}
impl<E, S, I, C> Future for RawFetch<E, S, I, C>
where
E: Eviction,
S: HashBuilder,
I: Indexer<Eviction = E>,
C: Any + Send + 'static,
{
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.as_mut().project();
loop {
match this.state {
RawFetchState::Init {
optional_fetch_builder,
required_fetch_builder,
} => {
handle_try! { *this.state, try_set_optional(optional_fetch_builder, required_fetch_builder, this.ctx) }
handle_try! { *this.state, try_set_required(required_fetch_builder, this.ctx, *this.id, *this.hash, this.key.as_ref().unwrap(), this.inflights, Ok(None)) }
}
RawFetchState::FetchOptional {
optional_fetch,
required_fetch_builder,
} => {
if this.close.load(Ordering::Relaxed) {
return Poll::Ready(());
}
match optional_fetch.poll_unpin(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(Ok(Some(target))) => {
handle_try! {*this.state, handle_target(target, this.key, this.cache, Source::Disk) }
}
Poll::Ready(Ok(None)) => {
handle_try! { *this.state, try_set_required(required_fetch_builder, this.ctx, *this.id, *this.hash, this.key.as_ref().unwrap(), &this.inflights, Ok(None)) }
}
Poll::Ready(Err(e)) => {
handle_try! { *this.state, try_set_required(required_fetch_builder, this.ctx, *this.id, *this.hash, this.key.as_ref().unwrap(), &this.inflights, Err(e)) }
}
}
}
RawFetchState::FetchRequired { required_fetch } => {
if this.close.load(Ordering::Relaxed) {
return Poll::Ready(());
}
match required_fetch.poll_unpin(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(Ok(target)) => {
handle_try! { *this.state, handle_target(target, this.key, this.cache, Source::Outer) }
}
Poll::Ready(Err(e)) => {
handle_try! { *this.state, handle_error(e, *this.id, *this.hash, this.key.as_ref().unwrap(), this.inflights) }
}
}
}
RawFetchState::Notify { res, notifiers } => {
handle_try! { *this.state, handle_notify(res.take().unwrap(), notifiers) }
}
RawFetchState::Ready => return Poll::Ready(()),
}
}
}
}
impl<E, S, I, C> RawFetch<E, S, I, C>
where
E: Eviction,
S: HashBuilder,
I: Indexer<Eviction = E>,
C: Any + Send + 'static,
{
#[expect(clippy::type_complexity)]
fn try_set_optional(
optional_fetch_builder: &mut Option<OptionalFetchBuilder<E::Key, E::Value, E::Properties, C>>,
required_fetch_builder: &mut Option<RequiredFetchBuilder<E::Key, E::Value, E::Properties, C>>,
ctx: &mut C,
) -> Try<E, S, I, C> {
match optional_fetch_builder.take() {
None => Try::Noop,
Some(optional_fetch_builder) => {
let optional_fetch = optional_fetch_builder(ctx);
Try::SetStateAndContinue(RawFetchState::FetchOptional {
optional_fetch,
required_fetch_builder: required_fetch_builder.take(),
})
}
}
}
#[expect(clippy::type_complexity)]
fn try_set_required(
required_fetch_builder: &mut Option<RequiredFetchBuilder<E::Key, E::Value, E::Properties, C>>,
ctx: &mut C,
id: usize,
hash: u64,
key: &E::Key,
inflights: &Arc<Mutex<InflightManager<E, S, I>>>,
res_no_fetch: Result<Option<RawCacheEntry<E, S, I>>>,
) -> Try<E, S, I, C> {
match required_fetch_builder.take() {
None => {}
Some(required_fetch_builder) => {
let required_fetch = required_fetch_builder(ctx);
return Try::SetStateAndContinue(RawFetchState::FetchRequired { required_fetch });
}
}
let fetch_or_take = match inflights.lock().fetch_or_take(hash, key, id) {
Some(fetch_or_take) => fetch_or_take,
None => return Try::Ready,
};
match fetch_or_take {
FetchOrTake::Fetch(required_fetch_builder) => {
let required_fetch = required_fetch_builder(ctx);
Try::SetStateAndContinue(RawFetchState::FetchRequired { required_fetch })
}
FetchOrTake::Notifiers(notifiers) => Try::SetStateAndContinue(RawFetchState::Notify {
res: Some(res_no_fetch),
notifiers,
}),
}
}
fn handle_target(
target: FetchTarget<E::Key, E::Value, E::Properties>,
key: &mut Once<E::Key>,
cache: &RawCache<E, S, I>,
source: Source,
) -> Try<E, S, I, C> {
match target {
FetchTarget::Entry { value, properties } => {
let key = key.take().unwrap();
cache.insert_with_properties_inner(key, value, properties, source);
}
FetchTarget::Piece(piece) => {
cache.insert_piece(piece);
}
}
Try::Ready
}
fn handle_error(
e: Error,
id: usize,
hash: u64,
key: &E::Key,
inflights: &Arc<Mutex<InflightManager<E, S, I>>>,
) -> Try<E, S, I, C> {
let notifiers = match inflights.lock().take(hash, key, Some(id)) {
Some(notifiers) => notifiers,
None => {
return Try::Ready;
}
};
Try::SetStateAndContinue(RawFetchState::Notify {
res: Some(Err(e)),
notifiers,
})
}
#[expect(clippy::type_complexity)]
fn handle_notify(
res: Result<Option<RawCacheEntry<E, S, I>>>,
notifiers: &mut Vec<Notifier<Option<RawCacheEntry<E, S, I>>>>,
) -> Try<E, S, I, C> {
match res {
Ok(e) => {
for notifier in notifiers.drain(..) {
let _ = notifier.send(Ok(e.clone()));
}
}
Err(e) => {
for notifier in notifiers.drain(..) {
let _ = notifier.send(Err(e.clone()));
}
}
}
Try::Ready
}
}
#[pinned_drop]
impl<E, S, I, C> PinnedDrop for RawFetch<E, S, I, C>
where
E: Eviction,
S: HashBuilder,
I: Indexer<Eviction = E>,
{
fn drop(self: Pin<&mut Self>) {
let this = self.project();
match this.state {
RawFetchState::Notify { .. } | RawFetchState::Ready => return,
RawFetchState::Init { .. } | RawFetchState::FetchOptional { .. } | RawFetchState::FetchRequired { .. } => {}
}
if let Some(notifiers) = this
.inflights
.lock()
.take(*this.hash, this.key.as_ref().unwrap(), Some(*this.id))
{
for notifier in notifiers {
let _ =
notifier
.send(Err(Error::new(ErrorKind::TaskCancelled, "fetch task cancelled")
.with_context("hash", *this.hash)));
}
}
}
}
#[cfg(test)]
mod tests {
use foyer_common::hasher::ModHasher;
use rand::{rngs::SmallRng, seq::IndexedRandom, RngCore, SeedableRng};
use super::*;
use crate::{
eviction::{
fifo::{Fifo, FifoConfig},
lfu::{Lfu, LfuConfig},
lru::{Lru, LruConfig},
s3fifo::{S3Fifo, S3FifoConfig},
sieve::{Sieve, SieveConfig},
test_utils::TestProperties,
},
indexer::hash_table::HashTableIndexer,
test_utils::PiecePipe,
};
fn is_send_sync_static<T: Send + Sync + 'static>() {}
#[test]
fn test_send_sync_static() {
is_send_sync_static::<RawCache<Fifo<(), (), TestProperties>, ModHasher, HashTableIndexer<_>>>();
is_send_sync_static::<RawCache<S3Fifo<(), (), TestProperties>, ModHasher, HashTableIndexer<_>>>();
is_send_sync_static::<RawCache<Lfu<(), (), TestProperties>, ModHasher, HashTableIndexer<_>>>();
is_send_sync_static::<RawCache<Lru<(), (), TestProperties>, ModHasher, HashTableIndexer<_>>>();
is_send_sync_static::<RawCache<Sieve<(), (), TestProperties>, ModHasher, HashTableIndexer<_>>>();
}
#[expect(clippy::type_complexity)]
fn fifo_cache_for_test(
) -> RawCache<Fifo<u64, u64, TestProperties>, ModHasher, HashTableIndexer<Fifo<u64, u64, TestProperties>>> {
RawCache::new(RawCacheConfig {
capacity: 256,
shards: 4,
eviction_config: FifoConfig::default(),
hash_builder: Default::default(),
weighter: Arc::new(|_, _| 1),
filter: Arc::new(|_, _| true),
event_listener: None,
metrics: Arc::new(Metrics::noop()),
})
}
#[expect(clippy::type_complexity)]
fn s3fifo_cache_for_test(
) -> RawCache<S3Fifo<u64, u64, TestProperties>, ModHasher, HashTableIndexer<S3Fifo<u64, u64, TestProperties>>> {
RawCache::new(RawCacheConfig {
capacity: 256,
shards: 4,
eviction_config: S3FifoConfig::default(),
hash_builder: Default::default(),
weighter: Arc::new(|_, _| 1),
filter: Arc::new(|_, _| true),
event_listener: None,
metrics: Arc::new(Metrics::noop()),
})
}
#[expect(clippy::type_complexity)]
fn lru_cache_for_test(
) -> RawCache<Lru<u64, u64, TestProperties>, ModHasher, HashTableIndexer<Lru<u64, u64, TestProperties>>> {
RawCache::new(RawCacheConfig {
capacity: 256,
shards: 4,
eviction_config: LruConfig::default(),
hash_builder: Default::default(),
weighter: Arc::new(|_, _| 1),
filter: Arc::new(|_, _| true),
event_listener: None,
metrics: Arc::new(Metrics::noop()),
})
}
#[expect(clippy::type_complexity)]
fn lfu_cache_for_test(
) -> RawCache<Lfu<u64, u64, TestProperties>, ModHasher, HashTableIndexer<Lfu<u64, u64, TestProperties>>> {
RawCache::new(RawCacheConfig {
capacity: 256,
shards: 4,
eviction_config: LfuConfig::default(),
hash_builder: Default::default(),
weighter: Arc::new(|_, _| 1),
filter: Arc::new(|_, _| true),
event_listener: None,
metrics: Arc::new(Metrics::noop()),
})
}
#[expect(clippy::type_complexity)]
fn sieve_cache_for_test(
) -> RawCache<Sieve<u64, u64, TestProperties>, ModHasher, HashTableIndexer<Sieve<u64, u64, TestProperties>>> {
RawCache::new(RawCacheConfig {
capacity: 256,
shards: 4,
eviction_config: SieveConfig {},
hash_builder: Default::default(),
weighter: Arc::new(|_, _| 1),
filter: Arc::new(|_, _| true),
event_listener: None,
metrics: Arc::new(Metrics::noop()),
})
}
#[test_log::test]
fn test_insert_phantom() {
let fifo = fifo_cache_for_test();
let e1 = fifo.insert_with_properties(1, 1, TestProperties::default().with_phantom(true));
assert_eq!(fifo.usage(), 0);
drop(e1);
assert_eq!(fifo.usage(), 0);
let e2a = fifo.insert_with_properties(2, 2, TestProperties::default().with_phantom(true));
assert_eq!(fifo.usage(), 0);
assert!(fifo.get(&2).is_none());
assert_eq!(fifo.usage(), 0);
drop(e2a);
assert_eq!(fifo.usage(), 0);
let fifo = fifo_cache_for_test();
fifo.insert(1, 1);
assert_eq!(fifo.usage(), 1);
assert_eq!(fifo.get(&1).unwrap().value(), &1);
let e2 = fifo.insert_with_properties(1, 100, TestProperties::default().with_phantom(true));
assert_eq!(fifo.usage(), 0);
drop(e2);
assert_eq!(fifo.usage(), 0);
assert!(fifo.get(&1).is_none());
}
#[expect(clippy::type_complexity)]
#[test_log::test]
fn test_insert_filter() {
let fifo: RawCache<
Fifo<u64, u64, TestProperties>,
ModHasher,
HashTableIndexer<Fifo<u64, u64, TestProperties>>,
> = RawCache::new(RawCacheConfig {
capacity: 256,
shards: 4,
eviction_config: FifoConfig::default(),
hash_builder: Default::default(),
weighter: Arc::new(|_, _| 1),
filter: Arc::new(|k, _| !matches!(*k, 42)),
event_listener: None,
metrics: Arc::new(Metrics::noop()),
});
fifo.insert(1, 1);
fifo.insert(2, 2);
fifo.insert(42, 42);
assert_eq!(fifo.usage(), 2);
assert_eq!(fifo.get(&1).unwrap().value(), &1);
assert_eq!(fifo.get(&2).unwrap().value(), &2);
assert!(fifo.get(&42).is_none());
}
#[test]
fn test_evict_all() {
let pipe = Arc::new(PiecePipe::default());
let fifo = fifo_cache_for_test().with_pipe(pipe.clone());
for i in 0..fifo.capacity() as _ {
fifo.insert(i, i);
}
assert_eq!(fifo.usage(), fifo.capacity());
fifo.evict_all();
let mut pieces = pipe
.pieces()
.iter()
.map(|p| (p.hash(), *p.key(), *p.value()))
.collect_vec();
pieces.sort_by_key(|t| t.0);
let expected = (0..fifo.capacity() as u64).map(|i| (i, i, i)).collect_vec();
assert_eq!(pieces, expected);
}
#[test]
fn test_insert_size_over_capacity() {
let cache: RawCache<Fifo<Vec<u8>, Vec<u8>, TestProperties>, ModHasher, HashTableIndexer<_>> =
RawCache::new(RawCacheConfig {
capacity: 4 * 1024, shards: 1,
eviction_config: FifoConfig::default(),
hash_builder: Default::default(),
weighter: Arc::new(|k, v| k.len() + v.len()),
filter: Arc::new(|_, _| true),
event_listener: None,
metrics: Arc::new(Metrics::noop()),
});
let key = vec![b'k'; 1024]; let value = vec![b'v'; 5 * 1024];
cache.insert(key.clone(), value.clone());
assert_eq!(cache.usage(), 6 * 1024);
assert_eq!(cache.get(&key).unwrap().value(), &value);
}
#[test]
fn test_capacity_distribution_without_loss() {
let cache: RawCache<Fifo<u64, u64, TestProperties>, ModHasher, HashTableIndexer<_>> =
RawCache::new(RawCacheConfig {
capacity: 3,
shards: 2,
eviction_config: FifoConfig::default(),
hash_builder: Default::default(),
weighter: Arc::new(|_, _| 1),
filter: Arc::new(|_, _| true),
event_listener: None,
metrics: Arc::new(Metrics::noop()),
});
for key in 0..3 {
let entry = cache.insert(key, key);
drop(entry);
}
assert_eq!(cache.usage(), 3);
for key in 0..3 {
let entry = cache.get(&key).expect("entry should exist");
assert_eq!(*entry, key);
drop(entry);
}
}
#[test]
fn test_capacity_distribution_with_more_shards_than_capacity() {
let cache: RawCache<Fifo<u64, u64, TestProperties>, ModHasher, HashTableIndexer<_>> =
RawCache::new(RawCacheConfig {
capacity: 2,
shards: 4,
eviction_config: FifoConfig::default(),
hash_builder: Default::default(),
weighter: Arc::new(|_, _| 1),
filter: Arc::new(|_, _| true),
event_listener: None,
metrics: Arc::new(Metrics::noop()),
});
for key in 0..2 {
let entry = cache.insert(key, key);
drop(entry);
}
assert_eq!(cache.usage(), 2);
for key in 0..2 {
let entry = cache.get(&key).expect("entry should exist");
assert_eq!(*entry, key);
drop(entry);
}
assert!(cache.get(&2).is_none());
}
fn test_resize<E>(cache: &RawCache<E, ModHasher, HashTableIndexer<E>>)
where
E: Eviction<Key = u64, Value = u64>,
{
let capacity = cache.capacity();
for i in 0..capacity as u64 * 2 {
cache.insert(i, i);
}
assert_eq!(cache.usage(), capacity);
cache.resize(capacity / 2).unwrap();
assert_eq!(cache.usage(), capacity / 2);
for i in 0..capacity as u64 * 2 {
cache.insert(i, i);
}
assert_eq!(cache.usage(), capacity / 2);
}
#[test]
fn test_fifo_cache_resize() {
let cache = fifo_cache_for_test();
test_resize(&cache);
}
#[test]
fn test_s3fifo_cache_resize() {
let cache = s3fifo_cache_for_test();
test_resize(&cache);
}
#[test]
fn test_lru_cache_resize() {
let cache = lru_cache_for_test();
test_resize(&cache);
}
#[test]
fn test_lfu_cache_resize() {
let cache = lfu_cache_for_test();
test_resize(&cache);
}
#[test]
fn test_sieve_cache_resize() {
let cache = sieve_cache_for_test();
test_resize(&cache);
}
mod fuzzy {
use foyer_common::properties::Hint;
use super::*;
fn fuzzy<E, S>(cache: RawCache<E, S, HashTableIndexer<E>>, hints: Vec<Hint>)
where
E: Eviction<Key = u64, Value = u64, Properties = TestProperties>,
S: HashBuilder,
{
let handles = (0..8)
.map(|i| {
let c = cache.clone();
let hints = hints.clone();
std::thread::spawn(move || {
let mut rng = SmallRng::seed_from_u64(i);
for _ in 0..100000 {
let key = rng.next_u64();
if let Some(entry) = c.get(&key) {
assert_eq!(key, *entry);
drop(entry);
continue;
}
let hint = hints.choose(&mut rng).cloned().unwrap();
c.insert_with_properties(key, key, TestProperties::default().with_hint(hint));
}
})
})
.collect_vec();
handles.into_iter().for_each(|handle| handle.join().unwrap());
assert_eq!(cache.usage(), cache.capacity());
}
#[test_log::test]
fn test_fifo_cache_fuzzy() {
let cache: RawCache<Fifo<u64, u64, TestProperties>, ModHasher, HashTableIndexer<_>> =
RawCache::new(RawCacheConfig {
capacity: 256,
shards: 4,
eviction_config: FifoConfig::default(),
hash_builder: Default::default(),
weighter: Arc::new(|_, _| 1),
filter: Arc::new(|_, _| true),
event_listener: None,
metrics: Arc::new(Metrics::noop()),
});
let hints = vec![Hint::Normal];
fuzzy(cache, hints);
}
#[test_log::test]
fn test_s3fifo_cache_fuzzy() {
let cache: RawCache<S3Fifo<u64, u64, TestProperties>, ModHasher, HashTableIndexer<_>> =
RawCache::new(RawCacheConfig {
capacity: 256,
shards: 4,
eviction_config: S3FifoConfig::default(),
hash_builder: Default::default(),
weighter: Arc::new(|_, _| 1),
filter: Arc::new(|_, _| true),
event_listener: None,
metrics: Arc::new(Metrics::noop()),
});
let hints = vec![Hint::Normal];
fuzzy(cache, hints);
}
#[test_log::test]
fn test_lru_cache_fuzzy() {
let cache: RawCache<Lru<u64, u64, TestProperties>, ModHasher, HashTableIndexer<_>> =
RawCache::new(RawCacheConfig {
capacity: 256,
shards: 4,
eviction_config: LruConfig::default(),
hash_builder: Default::default(),
weighter: Arc::new(|_, _| 1),
filter: Arc::new(|_, _| true),
event_listener: None,
metrics: Arc::new(Metrics::noop()),
});
let hints = vec![Hint::Normal, Hint::Low];
fuzzy(cache, hints);
}
#[test_log::test]
fn test_lfu_cache_fuzzy() {
let cache: RawCache<Lfu<u64, u64, TestProperties>, ModHasher, HashTableIndexer<_>> =
RawCache::new(RawCacheConfig {
capacity: 256,
shards: 4,
eviction_config: LfuConfig::default(),
hash_builder: Default::default(),
weighter: Arc::new(|_, _| 1),
filter: Arc::new(|_, _| true),
event_listener: None,
metrics: Arc::new(Metrics::noop()),
});
let hints = vec![Hint::Normal];
fuzzy(cache, hints);
}
#[test_log::test]
fn test_sieve_cache_fuzzy() {
let cache: RawCache<Sieve<u64, u64, TestProperties>, ModHasher, HashTableIndexer<_>> =
RawCache::new(RawCacheConfig {
capacity: 256,
shards: 4,
eviction_config: SieveConfig {},
hash_builder: Default::default(),
weighter: Arc::new(|_, _| 1),
filter: Arc::new(|_, _| true),
event_listener: None,
metrics: Arc::new(Metrics::noop()),
});
let hints = vec![Hint::Normal];
fuzzy(cache, hints);
}
}
}