#[cfg(feature = "tracing")]
use std::time::Duration;
use std::{
borrow::Cow,
fmt::Debug,
future::Future,
hash::Hash,
pin::Pin,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
task::{ready, Context, Poll},
time::Instant,
};
use equivalent::Equivalent;
#[cfg(feature = "tracing")]
use fastrace::prelude::*;
#[cfg(feature = "tracing")]
use foyer_common::tracing::{TracingConfig, TracingOptions};
use foyer_common::{
code::{DefaultHasher, HashBuilder, StorageKey, StorageValue},
error::{Error, ErrorKind, Result},
metrics::Metrics,
properties::{Age, Hint, Location, Properties, Source},
rate::RateLimiter,
};
use foyer_memory::{Cache, CacheEntry, FetchTarget, GetOrFetch, Piece, Pipe};
use foyer_storage::{Load, Populated, Statistics, Store};
use futures_util::FutureExt as _;
use pin_project::pin_project;
use serde::{Deserialize, Serialize};
use crate::hybrid::{
builder::HybridCacheBuilder,
writer::{HybridCacheStorageWriter, HybridCacheWriter},
};
#[cfg(feature = "tracing")]
macro_rules! root_span {
($self:ident, mut $name:ident, $label:expr) => {
root_span!($self, (mut) $name, $label)
};
($self:ident, $name:ident, $label:expr) => {
root_span!($self, () $name, $label)
};
($self:ident, ($($mut:tt)?) $name:ident, $label:expr) => {
let $name = if $self.inner.tracing.load(std::sync::atomic::Ordering::Relaxed) {
Span::root($label, SpanContext::random())
} else {
Span::noop()
};
};
}
#[cfg(not(feature = "tracing"))]
macro_rules! root_span {
($self:ident, mut $name:ident, $label:expr) => {};
($self:ident, $name:ident, $label:expr) => {};
($self:ident, ($($mut:tt)?) $name:ident, $label:expr) => {};
}
#[cfg(feature = "tracing")]
macro_rules! try_cancel {
($span:expr, $threshold:expr) => {
if let Some(elapsed) = $span.elapsed() {
if elapsed < $threshold {
$span.cancel();
}
}
};
}
#[cfg(not(feature = "tracing"))]
macro_rules! try_cancel {
($span:expr, $threshold:expr) => {};
}
#[derive(Debug, Clone, Default)]
pub struct HybridCacheProperties {
phantom: bool,
hint: Hint,
location: Location,
age: Age,
}
impl HybridCacheProperties {
fn with_phantom(mut self, phantom: bool) -> Self {
self.phantom = phantom;
self
}
fn phantom(&self) -> bool {
self.phantom
}
pub fn with_hint(mut self, hint: Hint) -> Self {
self.hint = hint;
self
}
pub fn hint(&self) -> Hint {
self.hint
}
pub fn with_location(mut self, location: Location) -> Self {
self.location = location;
self
}
pub fn location(&self) -> Location {
self.location
}
pub fn age(&self) -> Age {
self.age
}
}
impl Properties for HybridCacheProperties {
fn with_phantom(self, phantom: bool) -> Self {
self.with_phantom(phantom)
}
fn phantom(&self) -> Option<bool> {
Some(self.phantom())
}
fn with_hint(self, hint: Hint) -> Self {
self.with_hint(hint)
}
fn hint(&self) -> Option<Hint> {
Some(self.hint())
}
fn with_location(self, location: Location) -> Self {
self.with_location(location)
}
fn location(&self) -> Option<Location> {
Some(self.location())
}
fn with_age(mut self, age: Age) -> Self {
self.age = age;
self
}
fn age(&self) -> Option<Age> {
Some(self.age())
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
pub enum HybridCachePolicy {
#[default]
WriteOnEviction,
WriteOnInsertion,
}
pub struct HybridCachePipe<K, V, S>
where
K: StorageKey,
V: StorageValue,
S: HashBuilder + Debug,
{
store: Store<K, V, S, HybridCacheProperties>,
}
impl<K, V, S> Debug for HybridCachePipe<K, V, S>
where
K: StorageKey,
V: StorageValue,
S: HashBuilder + Debug,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("HybridCachePipe").finish()
}
}
impl<K, V, S> HybridCachePipe<K, V, S>
where
K: StorageKey,
V: StorageValue,
S: HashBuilder + Debug,
{
pub fn new(store: Store<K, V, S, HybridCacheProperties>) -> Self {
Self { store }
}
}
impl<K, V, S> Pipe for HybridCachePipe<K, V, S>
where
K: StorageKey,
V: StorageValue,
S: HashBuilder + Debug,
{
type Key = K;
type Value = V;
type Properties = HybridCacheProperties;
fn is_enabled(&self) -> bool {
true
}
fn send(&self, piece: Piece<Self::Key, Self::Value, HybridCacheProperties>) {
match piece.properties().location() {
Location::InMem => return,
Location::Default | Location::OnDisk => {}
}
self.store.enqueue(piece, false);
}
fn flush(
&self,
pieces: Vec<Piece<Self::Key, Self::Value, HybridCacheProperties>>,
) -> Pin<Box<dyn Future<Output = ()> + Send>> {
let store = self.store.clone();
Box::pin(async move {
store.wait().await;
let device = store.device();
let throttler = device
.statistics()
.throttle()
.write_throughput
.map(|v| RateLimiter::new(v.get() as _));
for piece in pieces {
let bytes = store.entry_estimated_size(piece.key(), piece.value());
if let Some(throttler) = &throttler {
let wait = throttler.consume(bytes as _);
if !wait.is_zero() {
tokio::time::sleep(wait).await
}
}
store.enqueue(piece, false);
}
})
}
}
pub type HybridCacheEntry<K, V, S = DefaultHasher> = CacheEntry<K, V, S, HybridCacheProperties>;
#[derive(Debug)]
pub struct HybridCacheOptions {
pub policy: HybridCachePolicy,
pub flush_on_close: bool,
#[cfg(feature = "tracing")]
pub tracing_options: TracingOptions,
}
impl Default for HybridCacheOptions {
fn default() -> Self {
Self {
policy: HybridCachePolicy::default(),
flush_on_close: true,
#[cfg(feature = "tracing")]
tracing_options: TracingOptions::default(),
}
}
}
struct Inner<K, V, S = DefaultHasher>
where
K: StorageKey,
V: StorageValue,
S: HashBuilder + Debug,
{
name: Cow<'static, str>,
policy: HybridCachePolicy,
flush_on_close: bool,
metrics: Arc<Metrics>,
closed: Arc<AtomicBool>,
memory: Cache<K, V, S, HybridCacheProperties>,
storage: Store<K, V, S, HybridCacheProperties>,
#[cfg(feature = "tracing")]
tracing: std::sync::atomic::AtomicBool,
#[cfg(feature = "tracing")]
tracing_config: TracingConfig,
}
impl<K, V, S> Inner<K, V, S>
where
K: StorageKey,
V: StorageValue,
S: HashBuilder + Debug,
{
async fn close_inner(
closed: Arc<AtomicBool>,
memory: Cache<K, V, S, HybridCacheProperties>,
storage: Store<K, V, S, HybridCacheProperties>,
flush_on_close: bool,
) -> Result<()> {
if closed.fetch_or(true, Ordering::Relaxed) {
return Ok(());
}
let now = Instant::now();
if flush_on_close {
let bytes = memory.usage();
tracing::info!(bytes, "[hybrid]: flush all in-memory cached entries to disk on close");
memory.flush().await;
}
storage.close().await?;
let elapsed = now.elapsed();
tracing::info!("[hybrid]: close consumes {elapsed:?}");
Ok(())
}
async fn close(&self) -> Result<()> {
Self::close_inner(
self.closed.clone(),
self.memory.clone(),
self.storage.clone(),
self.flush_on_close,
)
.await
}
}
impl<K, V, S> Drop for Inner<K, V, S>
where
K: StorageKey,
V: StorageValue,
S: HashBuilder + Debug,
{
fn drop(&mut self) {
let name = self.name.clone();
let closed = self.closed.clone();
let memory = self.memory.clone();
let storage = self.storage.clone();
let flush_on_close = self.flush_on_close;
self.storage.spawner().spawn(async move {
if let Err(e) = Self::close_inner(closed, memory, storage, flush_on_close).await {
tracing::error!(?name, ?e, "[hybrid]: failed to close hybrid cache");
}
});
}
}
pub struct HybridCache<K, V, S = DefaultHasher>
where
K: StorageKey,
V: StorageValue,
S: HashBuilder + Debug,
{
inner: Arc<Inner<K, V, S>>,
}
impl<K, V, S> Debug for HybridCache<K, V, S>
where
K: StorageKey,
V: StorageValue,
S: HashBuilder + Debug,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut r = f.debug_struct("HybridCache");
r.field("policy", &self.inner.policy)
.field("flush_on_close", &self.inner.flush_on_close)
.field("memory", &self.inner.memory)
.field("storage", &self.inner.storage);
#[cfg(feature = "tracing")]
r.field("tracing", &self.inner.tracing)
.field("tracing_config", &self.inner.tracing_config);
r.finish()
}
}
impl<K, V, S> Clone for HybridCache<K, V, S>
where
K: StorageKey,
V: StorageValue,
S: HashBuilder + Debug,
{
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
}
}
}
impl<K, V> HybridCache<K, V, DefaultHasher>
where
K: StorageKey,
V: StorageValue,
{
pub fn builder() -> HybridCacheBuilder<K, V> {
HybridCacheBuilder::new()
}
}
impl<K, V, S> HybridCache<K, V, S>
where
K: StorageKey,
V: StorageValue,
S: HashBuilder + Debug,
{
pub(crate) fn new(
name: Cow<'static, str>,
options: HybridCacheOptions,
memory: Cache<K, V, S, HybridCacheProperties>,
storage: Store<K, V, S, HybridCacheProperties>,
metrics: Arc<Metrics>,
) -> Self {
let policy = options.policy;
let flush_on_close = options.flush_on_close;
#[cfg(feature = "tracing")]
let tracing_config = {
let cfg = TracingConfig::default();
cfg.update(options.tracing_options);
cfg
};
#[cfg(feature = "tracing")]
let tracing = std::sync::atomic::AtomicBool::new(false);
let closed = Arc::new(AtomicBool::new(false));
let inner = Inner {
name,
policy,
flush_on_close,
closed,
memory,
storage,
metrics,
#[cfg(feature = "tracing")]
tracing,
#[cfg(feature = "tracing")]
tracing_config,
};
let inner = Arc::new(inner);
Self { inner }
}
pub fn name(&self) -> &str {
&self.inner.name
}
pub fn policy(&self) -> HybridCachePolicy {
self.inner.policy
}
#[cfg(feature = "tracing")]
pub fn update_tracing_options(&self, options: TracingOptions) {
self.inner.tracing_config.update(options);
}
pub fn memory(&self) -> &Cache<K, V, S, HybridCacheProperties> {
&self.inner.memory
}
pub fn storage(&self) -> &Store<K, V, S, HybridCacheProperties> {
&self.inner.storage
}
#[cfg(feature = "tracing")]
pub fn enable_tracing(&self) {
self.inner.tracing.store(true, std::sync::atomic::Ordering::Relaxed);
}
#[cfg(feature = "tracing")]
pub fn disable_tracing(&self) {
self.inner.tracing.store(true, std::sync::atomic::Ordering::Relaxed);
}
#[cfg(feature = "tracing")]
pub fn is_tracing_enabled(&self) -> bool {
self.inner.tracing.load(std::sync::atomic::Ordering::Relaxed)
}
pub fn insert(&self, key: K, value: V) -> HybridCacheEntry<K, V, S> {
root_span!(self, span, "foyer::hybrid::cache::insert");
#[cfg(feature = "tracing")]
let _guard = span.set_local_parent();
let now = Instant::now();
let entry = self.inner.memory.insert(key, value);
if self.inner.policy == HybridCachePolicy::WriteOnInsertion {
self.inner.storage.enqueue(entry.piece(), false);
}
self.inner.metrics.hybrid_insert.increase(1);
self.inner
.metrics
.hybrid_insert_duration
.record(now.elapsed().as_secs_f64());
try_cancel!(span, self.inner.tracing_config.record_hybrid_insert_threshold());
entry
}
pub fn insert_with_properties(
&self,
key: K,
value: V,
properties: HybridCacheProperties,
) -> HybridCacheEntry<K, V, S> {
root_span!(self, span, "foyer::hybrid::cache::insert");
#[cfg(feature = "tracing")]
let _guard = span.set_local_parent();
let now = Instant::now();
let entry = self.inner.memory.insert_with_properties(key, value, properties);
if self.inner.policy == HybridCachePolicy::WriteOnInsertion && entry.properties().location() != Location::InMem
{
self.inner.storage.enqueue(entry.piece(), false);
}
self.inner.metrics.hybrid_insert.increase(1);
self.inner
.metrics
.hybrid_insert_duration
.record(now.elapsed().as_secs_f64());
try_cancel!(span, self.inner.tracing_config.record_hybrid_insert_threshold());
entry
}
pub fn remove<Q>(&self, key: &Q)
where
Q: Hash + Equivalent<K> + ?Sized + Send + Sync + 'static,
{
root_span!(self, span, "foyer::hybrid::cache::remove");
#[cfg(feature = "tracing")]
let _guard = span.set_local_parent();
let now = Instant::now();
self.inner.memory.remove(key);
self.inner.storage.delete(key);
self.inner.metrics.hybrid_remove.increase(1);
self.inner
.metrics
.hybrid_remove_duration
.record(now.elapsed().as_secs_f64());
try_cancel!(span, self.inner.tracing_config.record_hybrid_remove_threshold());
}
pub fn contains<Q>(&self, key: &Q) -> bool
where
Q: Hash + Equivalent<K> + ?Sized,
{
self.inner.memory.contains(key) || self.inner.storage.may_contains(key)
}
pub async fn clear(&self) -> Result<()> {
self.inner.memory.clear();
self.inner.storage.destroy().await?;
Ok(())
}
pub async fn close(&self) -> Result<()> {
self.inner.close().await
}
pub fn statistics(&self) -> &Arc<Statistics> {
self.inner.storage.statistics()
}
pub fn writer(&self, key: K) -> HybridCacheWriter<K, V, S> {
HybridCacheWriter::new(self.clone(), key)
}
pub fn storage_writer(&self, key: K) -> HybridCacheStorageWriter<K, V, S> {
HybridCacheStorageWriter::new(self.clone(), key)
}
pub fn is_hybrid(&self) -> bool {
self.inner.storage.is_enabled()
}
pub(crate) fn metrics(&self) -> &Arc<Metrics> {
&self.inner.metrics
}
}
impl<K, V, S> HybridCache<K, V, S>
where
K: StorageKey + Clone,
V: StorageValue,
S: HashBuilder + Debug,
{
pub fn get<Q>(&self, key: &Q) -> HybridGet<K, V, S>
where
Q: Hash + Equivalent<K> + ?Sized + ToOwned<Owned = K>,
{
root_span!(self, span, "foyer::hybrid::cache::get");
let start = Instant::now();
let ctx = Arc::new(GetOrFetchCtx::default());
let store = self.inner.storage.clone();
let spawner = self.inner.storage.spawner();
let inner = self.inner.memory.get_or_fetch_inner(
key,
|| {
let key = key.to_owned();
Some(Box::new(|ctx: &mut Arc<GetOrFetchCtx>| {
let ctx = ctx.clone();
async move {
match store.load(&key).await {
Ok(Load::Entry {
key: _,
value,
populated: Populated { age },
}) => {
let properties = HybridCacheProperties::default().with_age(age);
Ok(Some(FetchTarget::Entry { value, properties }))
}
Ok(Load::Piece { piece, populated: _ }) => Ok(Some(FetchTarget::Piece(piece))),
Ok(Load::Throttled) => {
ctx.throttled.store(true, Ordering::Relaxed);
Ok(None)
}
Ok(Load::Miss) => Ok(None),
Err(e) => Err(e),
}
}
.boxed()
}))
},
|| None,
ctx,
spawner,
);
let metrics = self.inner.metrics.clone();
#[cfg(feature = "tracing")]
let span_cancel_threshold = self.inner.tracing_config.record_hybrid_get_threshold();
HybridGet {
inner,
metrics,
start,
#[cfg(feature = "tracing")]
span,
#[cfg(feature = "tracing")]
span_cancel_threshold,
}
}
pub fn get_or_fetch<Q, F, FU, IT, ER>(&self, key: &Q, fetch: F) -> HybridGetOrFetch<K, V, S>
where
Q: Hash + Equivalent<K> + ?Sized + ToOwned<Owned = K>,
F: FnOnce() -> FU,
FU: Future<Output = std::result::Result<IT, ER>> + Send + 'static,
IT: Into<FetchTarget<K, V, HybridCacheProperties>>,
ER: Into<anyhow::Error>,
{
root_span!(self, span, "foyer::hybrid::cache::get_or_fetch");
let start = Instant::now();
let ctx = Arc::new(GetOrFetchCtx::default());
let store = self.inner.storage.clone();
let spawner = self.inner.storage.spawner();
let fut = fetch();
let inner = self.inner.memory.get_or_fetch_inner(
key,
|| {
let key = key.to_owned();
Some(Box::new(|ctx: &mut Arc<GetOrFetchCtx>| {
let ctx = ctx.clone();
async move {
let load = store.load(&key).await;
tracing::trace!(load = ?load, "[hybrid]: loaded from disk cache");
match load {
Ok(Load::Entry {
key: _,
value,
populated: Populated { age },
}) => {
let properties = HybridCacheProperties::default().with_age(age);
Ok(Some(FetchTarget::Entry { value, properties }))
}
Ok(Load::Piece { piece, populated: _ }) => Ok(Some(FetchTarget::Piece(piece))),
Ok(Load::Throttled) => {
ctx.throttled.store(true, Ordering::Relaxed);
Ok(None)
}
Ok(Load::Miss) => Ok(None),
Err(e) => Err(e),
}
}
.boxed()
}))
},
|| {
Some(Box::new(|ctx| {
let ctx = ctx.clone();
async move {
match fut.await {
Ok(it) => {
let target = it.into();
let target = match target {
FetchTarget::Entry { value, mut properties } => {
properties = properties.with_age(Age::Fresh);
if ctx.throttled.load(Ordering::Relaxed) {
properties = properties.with_location(Location::InMem);
}
FetchTarget::Entry { value, properties }
}
_ => target,
};
Ok(target)
}
Err(e) => Err(Error::new(ErrorKind::External, "fetch failed").with_source(e)),
}
}
.boxed()
}))
},
ctx.clone(),
spawner,
);
let policy = self.inner.policy;
let store = self.inner.storage.clone();
let metrics = self.inner.metrics.clone();
#[cfg(feature = "tracing")]
let span_cancel_threshold = self.inner.tracing_config.record_hybrid_get_or_fetch_threshold();
HybridGetOrFetch {
inner,
policy,
store,
ctx,
metrics,
start,
#[cfg(feature = "tracing")]
span,
#[cfg(feature = "tracing")]
span_cancel_threshold,
}
}
}
#[must_use]
#[pin_project]
pub struct HybridGet<K, V, S = DefaultHasher>
where
K: StorageKey,
V: StorageValue,
S: HashBuilder + Debug,
{
#[pin]
inner: GetOrFetch<K, V, S, HybridCacheProperties>,
metrics: Arc<Metrics>,
start: Instant,
#[cfg(feature = "tracing")]
span: Span,
#[cfg(feature = "tracing")]
span_cancel_threshold: Duration,
}
impl<K, V, S> Debug for HybridGet<K, V, S>
where
K: StorageKey,
V: StorageValue,
S: HashBuilder + Debug,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("HybridGet").field("inner", &self.inner).finish()
}
}
impl<K, V, S> Future for HybridGet<K, V, S>
where
K: StorageKey + Clone,
V: StorageValue,
S: HashBuilder + Debug,
{
type Output = Result<Option<HybridCacheEntry<K, V, S>>>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
#[cfg(feature = "tracing")]
let _guard = this.span.set_local_parent();
let res = ready!(this.inner.poll_inner(cx));
match res.as_ref() {
Ok(Some(_)) => {
this.metrics.hybrid_hit.increase(1);
this.metrics
.hybrid_hit_duration
.record(this.start.elapsed().as_secs_f64());
}
Ok(None) => {
this.metrics.hybrid_miss.increase(1);
this.metrics
.hybrid_miss_duration
.record(this.start.elapsed().as_secs_f64());
}
Err(_) => {
this.metrics.hybrid_error.increase(1);
this.metrics
.hybrid_error_duration
.record(this.start.elapsed().as_secs_f64());
}
}
try_cancel!(this.span, *this.span_cancel_threshold);
Poll::Ready(res)
}
}
impl<K, V, S> HybridGet<K, V, S>
where
K: StorageKey + Clone,
V: StorageValue,
S: HashBuilder + Debug,
{
pub fn need_await(&self) -> bool {
self.inner.need_await()
}
#[expect(clippy::allow_attributes)]
#[allow(clippy::result_large_err)]
pub fn try_unwrap(self) -> std::result::Result<HybridCacheEntry<K, V, S>, Self> {
self.inner.try_unwrap().map_err(|inner| Self {
inner,
metrics: self.metrics,
start: self.start,
#[cfg(feature = "tracing")]
span: self.span,
#[cfg(feature = "tracing")]
span_cancel_threshold: self.span_cancel_threshold,
})
}
}
#[derive(Debug, Default)]
struct GetOrFetchCtx {
throttled: AtomicBool,
}
#[must_use]
#[pin_project]
pub struct HybridGetOrFetch<K, V, S = DefaultHasher>
where
K: StorageKey,
V: StorageValue,
S: HashBuilder + Debug,
{
#[pin]
inner: GetOrFetch<K, V, S, HybridCacheProperties>,
policy: HybridCachePolicy,
store: Store<K, V, S, HybridCacheProperties>,
ctx: Arc<GetOrFetchCtx>,
metrics: Arc<Metrics>,
start: Instant,
#[cfg(feature = "tracing")]
span: Span,
#[cfg(feature = "tracing")]
span_cancel_threshold: Duration,
}
impl<K, V, S> Debug for HybridGetOrFetch<K, V, S>
where
K: StorageKey,
V: StorageValue,
S: HashBuilder + Debug,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("HybridGetOrFetch").field("inner", &self.inner).finish()
}
}
impl<K, V, S> Future for HybridGetOrFetch<K, V, S>
where
K: StorageKey + Clone,
V: StorageValue,
S: HashBuilder + Debug,
{
type Output = Result<HybridCacheEntry<K, V, S>>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
#[cfg(feature = "tracing")]
let _guard = this.span.set_local_parent();
let res = ready!(this.inner.poll(cx));
if let Ok(entry) = res.as_ref() {
if entry.properties().location() != Location::InMem
&& *this.policy == HybridCachePolicy::WriteOnInsertion
&& this.store.is_enabled()
&& !this.ctx.throttled.load(Ordering::Relaxed)
{
this.store.enqueue(entry.piece(), false);
}
}
match res.as_ref() {
Ok(e) => match e.source() {
Source::Outer => {
this.metrics.hybrid_miss.increase(1);
this.metrics
.hybrid_miss_duration
.record(this.start.elapsed().as_secs_f64());
}
Source::Memory | Source::Disk => {
this.metrics.hybrid_hit.increase(1);
this.metrics
.hybrid_hit_duration
.record(this.start.elapsed().as_secs_f64());
}
},
Err(_) => {
this.metrics.hybrid_error.increase(1);
this.metrics
.hybrid_error_duration
.record(this.start.elapsed().as_secs_f64());
}
}
try_cancel!(this.span, *this.span_cancel_threshold);
Poll::Ready(res)
}
}
impl<K, V, S> HybridGetOrFetch<K, V, S>
where
K: StorageKey + Clone,
V: StorageValue,
S: HashBuilder + Debug,
{
pub fn need_await(&self) -> bool {
self.inner.need_await()
}
#[expect(clippy::allow_attributes)]
#[allow(clippy::result_large_err)]
pub fn try_unwrap(self) -> std::result::Result<HybridCacheEntry<K, V, S>, Self> {
self.inner.try_unwrap().map_err(|inner| Self {
inner,
policy: self.policy,
store: self.store,
ctx: self.ctx,
metrics: self.metrics,
start: self.start,
#[cfg(feature = "tracing")]
span: self.span,
#[cfg(feature = "tracing")]
span_cancel_threshold: self.span_cancel_threshold,
})
}
}
#[cfg(test)]
mod tests {
use std::{path::Path, sync::Arc};
use foyer_common::{hasher::ModHasher, properties::Source};
use foyer_storage::{test_utils::*, StorageFilter};
use mea::barrier::Barrier;
use storage::test_utils::Biased;
use crate::*;
const KB: usize = 1024;
const MB: usize = 1024 * 1024;
async fn open(dir: impl AsRef<Path>) -> HybridCache<u64, Vec<u8>, ModHasher> {
open_with(dir, |b| b, |b| b).await
}
async fn open_with(
dir: impl AsRef<Path>,
hybrid_cache_builder_mapper: impl FnOnce(HybridCacheBuilder<u64, Vec<u8>>) -> HybridCacheBuilder<u64, Vec<u8>>,
block_engine_builder_mapper: impl FnOnce(
BlockEngineConfig<u64, Vec<u8>, HybridCacheProperties>,
) -> BlockEngineConfig<u64, Vec<u8>, HybridCacheProperties>,
) -> HybridCache<u64, Vec<u8>, ModHasher> {
let mut block_engine_builder =
BlockEngineConfig::new(FsDeviceBuilder::new(dir).with_capacity(16 * MB).build().unwrap())
.with_block_size(MB);
block_engine_builder = block_engine_builder_mapper(block_engine_builder);
let hybrid_cache_builder = HybridCacheBuilder::new().with_name("test");
let hybrid_cache_builder = hybrid_cache_builder_mapper(hybrid_cache_builder);
hybrid_cache_builder
.memory(4 * MB)
.with_hash_builder(ModHasher::default())
.storage()
.with_io_engine_config(PsyncIoEngineConfig::new())
.with_engine_config(block_engine_builder)
.build()
.await
.unwrap()
}
#[test_log::test(tokio::test)]
async fn test_hybrid_cache() {
let dir = tempfile::tempdir().unwrap();
let hybrid = open(dir.path()).await;
let e1 = hybrid.insert(1, vec![1; 7 * KB]);
let e2 = hybrid.insert_with_properties(2, vec![2; 7 * KB], HybridCacheProperties::default());
assert_eq!(e1.value(), &vec![1; 7 * KB]);
assert_eq!(e2.value(), &vec![2; 7 * KB]);
let e3 = hybrid.storage_writer(3).insert(vec![3; 7 * KB]).unwrap();
let e4 = hybrid
.storage_writer(4)
.insert_with_properties(vec![4; 7 * KB], HybridCacheProperties::default())
.unwrap();
assert_eq!(e3.value(), &vec![3; 7 * KB]);
assert_eq!(e4.value(), &vec![4; 7 * KB]);
let e5 = hybrid
.get_or_fetch(&5, || async move { Ok::<_, Error>(vec![5; 7 * KB]) })
.await
.unwrap();
assert_eq!(e5.value(), &vec![5; 7 * KB]);
let e1g = hybrid.get(&1).await.unwrap().unwrap();
assert_eq!(e1g.value(), &vec![1; 7 * KB]);
let e2g = hybrid.get(&2).await.unwrap().unwrap();
assert_eq!(e2g.value(), &vec![2; 7 * KB]);
assert!(hybrid.contains(&1));
hybrid.remove(&1);
assert!(!hybrid.contains(&1));
}
#[test_log::test(tokio::test)]
async fn test_hybrid_cache_writer() {
let dir = tempfile::tempdir().unwrap();
let hybrid = open_with(
dir.path(),
|b| b,
|b| b.with_admission_filter(StorageFilter::new().with_condition(Biased::new([1, 2, 3, 4]))),
)
.await;
let e1 = hybrid.writer(1).insert(vec![1; 7 * KB]);
let e2 = hybrid
.writer(2)
.insert_with_properties(vec![2; 7 * KB], HybridCacheProperties::default());
assert_eq!(e1.value(), &vec![1; 7 * KB]);
assert_eq!(e2.value(), &vec![2; 7 * KB]);
let e3 = hybrid.writer(3).storage().insert(vec![3; 7 * KB]).unwrap();
let e4 = hybrid
.writer(4)
.insert_with_properties(vec![4; 7 * KB], HybridCacheProperties::default());
assert_eq!(e3.value(), &vec![3; 7 * KB]);
assert_eq!(e4.value(), &vec![4; 7 * KB]);
let r5 = hybrid.writer(5).storage().insert(vec![5; 7 * KB]);
assert!(r5.is_none());
let e5 = hybrid.writer(5).storage().force().insert(vec![5; 7 * KB]).unwrap();
assert_eq!(e5.value(), &vec![5; 7 * KB]);
}
#[test_log::test(tokio::test)]
async fn test_hybrid_fetch_with_cache_location() {
let dir = tempfile::tempdir().unwrap();
let hybrid = open_with(dir.path(), |b| b.with_policy(HybridCachePolicy::WriteOnEviction), |b| b).await;
hybrid
.get_or_fetch(&1, || async move {
Ok::<_, Error>((
vec![1; 7 * KB],
HybridCacheProperties::default().with_location(Location::Default),
))
})
.await
.unwrap();
assert_eq!(hybrid.memory().get(&1).unwrap().value(), &vec![1; 7 * KB]);
hybrid.memory().evict_all();
hybrid.storage().wait().await;
assert_eq!(
hybrid.storage().load(&1).await.unwrap().entry().unwrap().1,
vec![1; 7 * KB]
);
hybrid
.get_or_fetch(&2, || async move {
Ok::<_, Error>((
vec![2; 7 * KB],
HybridCacheProperties::default().with_location(Location::InMem),
))
})
.await
.unwrap();
assert_eq!(hybrid.memory().get(&2).unwrap().value(), &vec![2; 7 * KB]);
hybrid.memory().evict_all();
hybrid.storage().wait().await;
assert!(hybrid.storage().load(&2).await.unwrap().is_miss());
hybrid
.get_or_fetch(&3, || async move {
Ok::<_, Error>((
vec![3; 7 * KB],
HybridCacheProperties::default().with_location(Location::OnDisk),
))
})
.await
.unwrap();
hybrid.storage().wait().await;
assert!(hybrid.memory().get(&3).is_none());
assert_eq!(
hybrid.storage().load(&3).await.unwrap().entry().unwrap().1,
vec![3; 7 * KB]
);
let dir = tempfile::tempdir().unwrap();
let hybrid = open_with(
dir.path(),
|b| b.with_policy(HybridCachePolicy::WriteOnInsertion),
|b| b,
)
.await;
hybrid
.get_or_fetch(&1, || async move {
Ok::<_, Error>((
vec![1; 7 * KB],
HybridCacheProperties::default().with_location(Location::Default),
))
})
.await
.unwrap();
hybrid.storage().wait().await;
assert_eq!(hybrid.memory().get(&1).unwrap().value(), &vec![1; 7 * KB]);
assert_eq!(
hybrid.storage().load(&1).await.unwrap().entry().unwrap().1,
vec![1; 7 * KB]
);
hybrid
.get_or_fetch(&2, || async move {
Ok::<_, Error>((
vec![2; 7 * KB],
HybridCacheProperties::default().with_location(Location::InMem),
))
})
.await
.unwrap();
hybrid.storage().wait().await;
assert_eq!(hybrid.memory().get(&2).unwrap().value(), &vec![2; 7 * KB]);
assert!(hybrid.storage().load(&2).await.unwrap().is_miss());
hybrid
.get_or_fetch(&3, || async move {
Ok::<_, Error>((
vec![3; 7 * KB],
HybridCacheProperties::default().with_location(Location::OnDisk),
))
})
.await
.unwrap();
hybrid.storage().wait().await;
assert!(hybrid.memory().get(&3).is_none());
assert_eq!(
hybrid.storage().load(&3).await.unwrap().entry().unwrap().1,
vec![3; 7 * KB]
);
}
#[test_log::test(tokio::test)]
async fn test_hybrid_insert_with_cache_location() {
let dir = tempfile::tempdir().unwrap();
let hybrid = open_with(dir.path(), |b| b.with_policy(HybridCachePolicy::WriteOnEviction), |b| b).await;
hybrid.insert_with_properties(
1,
vec![1; 7 * KB],
HybridCacheProperties::default().with_location(Location::Default),
);
assert_eq!(hybrid.memory().get(&1).unwrap().value(), &vec![1; 7 * KB]);
hybrid.memory().evict_all();
hybrid.storage().wait().await;
assert_eq!(
hybrid.storage().load(&1).await.unwrap().entry().unwrap().1,
vec![1; 7 * KB]
);
hybrid.insert_with_properties(
2,
vec![2; 7 * KB],
HybridCacheProperties::default().with_location(Location::InMem),
);
assert_eq!(hybrid.memory().get(&2).unwrap().value(), &vec![2; 7 * KB]);
hybrid.memory().evict_all();
hybrid.storage().wait().await;
assert!(hybrid.storage().load(&2).await.unwrap().is_miss());
hybrid.insert_with_properties(
3,
vec![3; 7 * KB],
HybridCacheProperties::default().with_location(Location::OnDisk),
);
hybrid.storage().wait().await;
assert!(hybrid.memory().get(&3).is_none());
assert_eq!(
hybrid.storage().load(&3).await.unwrap().entry().unwrap().1,
vec![3; 7 * KB]
);
let dir = tempfile::tempdir().unwrap();
let hybrid = open_with(
dir.path(),
|b| b.with_policy(HybridCachePolicy::WriteOnInsertion),
|b| b,
)
.await;
hybrid.insert_with_properties(
1,
vec![1; 7 * KB],
HybridCacheProperties::default().with_location(Location::Default),
);
hybrid.storage().wait().await;
assert_eq!(hybrid.memory().get(&1).unwrap().value(), &vec![1; 7 * KB]);
assert_eq!(
hybrid.storage().load(&1).await.unwrap().entry().unwrap().1,
vec![1; 7 * KB]
);
hybrid.insert_with_properties(
2,
vec![2; 7 * KB],
HybridCacheProperties::default().with_location(Location::InMem),
);
hybrid.storage().wait().await;
assert_eq!(hybrid.memory().get(&2).unwrap().value(), &vec![2; 7 * KB]);
assert!(hybrid.storage().load(&2).await.unwrap().is_miss());
hybrid.insert_with_properties(
3,
vec![3; 7 * KB],
HybridCacheProperties::default().with_location(Location::OnDisk),
);
hybrid.storage().wait().await;
assert!(hybrid.memory().get(&3).is_none());
assert_eq!(
hybrid.storage().load(&3).await.unwrap().entry().unwrap().1,
vec![3; 7 * KB]
);
}
#[test_log::test(tokio::test)]
async fn test_hybrid_read_throttled() {
let dir = tempfile::tempdir().unwrap();
let recorder = Recorder::default();
let hybrid = open_with(
dir.path(),
|b| b.with_policy(HybridCachePolicy::WriteOnInsertion),
|b| {
b.with_admission_filter(StorageFilter::new().with_condition(recorder.admission()))
.with_reinsertion_filter(StorageFilter::new().with_condition(recorder.eviction()))
},
)
.await;
hybrid.insert_with_properties(
1,
vec![1; 7 * KB],
HybridCacheProperties::default().with_location(Location::Default),
);
hybrid.storage().wait().await;
assert_eq!(hybrid.memory().get(&1).unwrap().value(), &vec![1; 7 * KB]);
assert_eq!(
hybrid.storage().load(&1).await.unwrap().entry().unwrap().1,
vec![1; 7 * KB]
);
let r = hybrid.memory().remove(&1);
assert!(r.is_some());
hybrid.storage().load_throttle_switch().throttle();
assert!(matches! {hybrid.storage().load(&1).await.unwrap(), Load::Throttled });
hybrid
.get_or_fetch(&1, || async move { Ok::<_, Error>(vec![1; 7 * KB]) })
.await
.unwrap();
hybrid.storage().wait().await;
assert_eq!(hybrid.memory().get(&1).unwrap().value(), &vec![1; 7 * KB]);
assert_eq!(recorder.dump(), vec![Record::Admit(1)]);
let dir = tempfile::tempdir().unwrap();
let recorder = Recorder::default();
let hybrid = open_with(
dir.path(),
|b| b.with_policy(HybridCachePolicy::WriteOnEviction),
|b| {
b.with_admission_filter(StorageFilter::new().with_condition(recorder.admission()))
.with_reinsertion_filter(StorageFilter::new().with_condition(recorder.eviction()))
},
)
.await;
hybrid.insert_with_properties(
1,
vec![1; 7 * KB],
HybridCacheProperties::default().with_location(Location::Default),
);
assert_eq!(hybrid.memory().get(&1).unwrap().value(), &vec![1; 7 * KB]);
hybrid.memory().evict_all();
hybrid.storage().wait().await;
assert_eq!(
hybrid.storage().load(&1).await.unwrap().entry().unwrap().1,
vec![1; 7 * KB]
);
hybrid.storage().load_throttle_switch().throttle();
assert!(matches! {hybrid.storage().load(&1).await.unwrap(), Load::Throttled });
hybrid
.get_or_fetch(&1, || async move { Ok::<_, Error>(vec![1; 7 * KB]) })
.await
.unwrap();
assert_eq!(hybrid.memory().get(&1).unwrap().value(), &vec![1; 7 * KB]);
hybrid.memory().evict_all();
hybrid.storage().wait().await;
assert_eq!(recorder.dump(), vec![Record::Admit(1)]);
}
#[test_log::test(tokio::test)]
async fn test_flush_on_close() {
let dir = tempfile::tempdir().unwrap();
let hybrid = open_with(dir.path(), |b| b.with_flush_on_close(false), |b| b).await;
hybrid.insert(1, vec![1; 7 * KB]);
assert!(hybrid.storage().load(&1).await.unwrap().is_miss());
hybrid.close().await.unwrap();
let hybrid = open_with(dir.path(), |b| b.with_flush_on_close(false), |b| b).await;
assert!(hybrid.storage().load(&1).await.unwrap().is_miss());
let dir = tempfile::tempdir().unwrap();
let hybrid = open_with(dir.path(), |b| b.with_flush_on_close(true), |b| b).await;
hybrid.insert(1, vec![1; 7 * KB]);
assert!(hybrid.storage().load(&1).await.unwrap().is_miss());
hybrid.close().await.unwrap();
let hybrid = open_with(dir.path(), |b| b.with_flush_on_close(true), |b| b).await;
assert_eq!(
hybrid.storage().load(&1).await.unwrap().kv().unwrap(),
(1, vec![1; 7 * KB])
);
}
#[test_log::test(tokio::test)]
async fn test_load_after_recovery() {
let open = |dir| async move {
HybridCacheBuilder::new()
.with_name("test")
.with_policy(HybridCachePolicy::WriteOnInsertion)
.memory(4 * MB)
.storage()
.with_io_engine_config(PsyncIoEngineConfig::new())
.with_engine_config(
BlockEngineConfig::new(FsDeviceBuilder::new(dir).with_capacity(16 * MB).build().unwrap())
.with_block_size(64 * KB),
)
.build()
.await
.unwrap()
};
let dir = tempfile::tempdir().unwrap();
let hybrid = open(&dir).await;
hybrid.insert(1, vec![1; 3 * KB]);
assert_eq!(*hybrid.get(&1).await.unwrap().unwrap(), vec![1; 3 * KB]);
hybrid.close().await.unwrap();
let hybrid = open(&dir).await;
assert_eq!(*hybrid.get(&1).await.unwrap().unwrap(), vec![1; 3 * KB]);
}
#[test_log::test(tokio::test)]
async fn test_concurrent_get_and_fetch() {
let dir = tempfile::tempdir().unwrap();
let load_holder = Holder::default();
let hybrid = open_with(dir.path(), |b| b, |b| b.with_load_holder(load_holder.clone())).await;
assert!(hybrid.get(&42).await.unwrap().is_none());
let barrier = Arc::new(Barrier::new(2));
load_holder.hold();
let get = hybrid.get(&42);
let b = barrier.clone();
let fetch = hybrid.get_or_fetch(&42, || async move {
b.wait().await;
Ok::<_, Error>(vec![b'x'; 42])
});
load_holder.unhold();
barrier.wait().await;
let r_get = get.await.unwrap();
let r_fetch = fetch.await.unwrap();
assert_eq!(r_get.unwrap().value(), r_fetch.value());
}
#[test_log::test(tokio::test)]
async fn test_entry_location() {
let dir = tempfile::tempdir().unwrap();
let flush_switch = foyer_storage::test_utils::Switch::default();
let hybrid = open_with(dir.path(), |b| b, |b| b.with_flush_switch(flush_switch.clone())).await;
let f1 = hybrid.get_or_fetch(&1, || async move { Ok::<_, Error>(vec![1; 7 * KB]) });
let f2 = hybrid.get_or_fetch(&1, || async move { Ok::<_, Error>(vec![1; 7 * KB]) });
let e1 = f1.await.unwrap();
let e2 = f2.await.unwrap();
assert_eq!(e1.source(), Source::Outer);
assert_eq!(e2.source(), Source::Outer);
drop(e1);
drop(e2);
let e3 = hybrid
.get_or_fetch(&1, || async move { Ok::<_, Error>(vec![1; 7 * KB]) })
.await
.unwrap();
assert_eq!(e3.source(), Source::Memory);
drop(e3);
flush_switch.on();
hybrid.memory().evict_all();
let e4 = hybrid
.get_or_fetch(&1, || async move { Ok::<_, Error>(vec![1; 7 * KB]) })
.await
.unwrap();
assert_eq!(e4.source(), Source::Memory);
drop(e4);
flush_switch.off();
hybrid.memory().remove(&1);
assert!(hybrid.memory().get(&1).is_none());
hybrid.storage().wait().await;
let e5 = hybrid
.get_or_fetch(&1, || async move { Ok::<_, Error>(vec![1; 7 * KB]) })
.await
.unwrap();
assert_eq!(e5.source(), Source::Disk);
drop(e5);
}
#[test_log::test(tokio::test)]
async fn test_hybrid_cache_fetch_error_downcast() {
#[derive(Debug, Clone, PartialEq, Eq)]
struct TestError(String);
impl std::fmt::Display for TestError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "TestError: {}", self.0)
}
}
impl std::error::Error for TestError {}
let e = TestError("expected unexpection".into());
let hybrid: HybridCache<u64, Vec<u8>> = HybridCacheBuilder::new()
.with_name("test")
.memory(100)
.storage()
.build()
.await
.unwrap();
let err = hybrid
.get_or_fetch(&0, || {
let e = e.clone();
async move { Err::<Vec<u8>, _>(e) }
})
.await
.unwrap_err();
let eref = err.downcast_ref::<TestError>();
assert_eq!(eref, Some(&e));
}
}