use std::{any::Any, collections::HashMap, path::PathBuf, sync::Arc};
use arc_swap::ArcSwap;
use async_trait::async_trait;
use bytes::Buf;
use once_cell::sync::Lazy;
use pingora_cache::{
key::CompactCacheKey, trace::SpanHandle, CacheKey, CacheMeta, HitHandler, MissHandler, Storage,
};
use pingora::Result;
pub(super) static DISK_MEMORY_CACHE: Lazy<
ArcSwap<HashMap<String, (DiskCacheItemMetadata, bytes::Bytes)>>,
> = Lazy::new(|| ArcSwap::new(Arc::new(HashMap::new())));
use crate::{
cache::disk::{
handlers::{DiskCacheHitHandler, DiskCacheHitHandlerInMemory, DiskCacheMissHandler},
meta::DiskCacheItemMetadata,
},
stores,
};
pub struct DiskCache {
pub directory: PathBuf,
}
impl DiskCache {
pub fn new() -> Self {
DiskCache {
directory: PathBuf::from("/tmp"),
}
}
pub fn get_directory_for(&self, namespace: &str) -> PathBuf {
let Some(path) = stores::get_cache_routing_by_key(namespace) else {
return self.directory.join(namespace);
};
PathBuf::from(path).join(namespace)
}
async fn get_cached_metadata(&self, key: &CacheKey) -> Option<DiskCacheItemMetadata> {
let path = self.get_directory_for(key.namespace());
let metadata_file = format!("{}.metadata", key.primary_key());
let body = tokio::fs::read(path.join(metadata_file)).await.ok()?;
serde_json::from_slice(&body).ok()
}
fn get_memory_key(key: &CacheKey) -> String {
format!("{}-{}", key.namespace(), key.primary_key())
}
}
#[async_trait]
impl Storage for DiskCache {
fn support_streaming_partial_write(&self) -> bool {
false
}
async fn lookup(
&'static self,
key: &CacheKey,
_: &SpanHandle,
) -> Result<Option<(CacheMeta, HitHandler)>> {
tracing::debug!("looking up cache for {key:?}");
let memcache_key = Self::get_memory_key(key);
if let Some((meta, body)) = DISK_MEMORY_CACHE.load().get(&memcache_key) {
tracing::debug!("found cache for {key:?} in memory");
return Ok(Some((
CacheMeta::new(
meta.fresh_until,
meta.created_at,
meta.stale_while_revalidate_sec,
meta.stale_if_error_sec,
DiskCacheItemMetadata::convert_headers(meta),
),
Box::new(DiskCacheHitHandlerInMemory::new(body.clone().reader())),
)));
}
let namespace = key.namespace();
let primary_key = key.primary_key();
let main_path = self.get_directory_for(namespace);
let cache_file = format!("{primary_key}.cache");
let file_path = main_path.join(cache_file);
let Ok(file_stream) = std::fs::OpenOptions::new().read(true).open(&file_path) else {
return Ok(None);
};
let Some(meta) = self.get_cached_metadata(key).await else {
return Ok(None);
};
tracing::debug!("found cache for {key:?}");
let buf_reader = std::io::BufReader::new(file_stream);
Ok(Some((
CacheMeta::new(
meta.fresh_until,
meta.created_at,
meta.stale_while_revalidate_sec,
meta.stale_if_error_sec,
DiskCacheItemMetadata::convert_headers(&meta),
),
Box::new(DiskCacheHitHandler::new(buf_reader, file_path, meta)),
)))
}
async fn get_miss_handler(
&'static self,
key: &CacheKey,
meta: &CacheMeta,
_: &SpanHandle,
) -> Result<MissHandler> {
tracing::debug!("getting miss handler for {key:?}");
let primary_key = key.primary_key();
let main_path = self.get_directory_for(key.namespace());
let metadata_file = format!("{primary_key}.metadata");
if let Err(err) = tokio::fs::create_dir_all(&main_path).await {
tracing::error!("failed to create directory {main_path:?}: {err}");
return Err(pingora::Error::new_str("failed to create directory"));
}
let Ok(serialized_metadata) =
serde_json::to_vec::<DiskCacheItemMetadata>(&DiskCacheItemMetadata::from(meta))
else {
return Err(pingora::Error::new_str("failed to serialize cache meta"));
};
tokio::fs::write(main_path.join(metadata_file), serialized_metadata)
.await
.ok();
Ok(Box::new(DiskCacheMissHandler::new(
key.to_owned(),
DiskCacheItemMetadata::from(meta),
main_path,
)))
}
async fn purge(&'static self, _: &CompactCacheKey, _: &SpanHandle) -> Result<bool> {
Ok(true)
}
async fn update_meta(
&'static self,
key: &CacheKey,
meta: &CacheMeta,
_: &SpanHandle,
) -> Result<bool> {
let namespace = key.namespace();
let primary_key = key.primary_key();
let main_path = self.get_directory_for(namespace);
let metadata_file = format!("{primary_key}.metadata");
let Ok(serialized_metadata) =
serde_json::to_vec::<DiskCacheItemMetadata>(&DiskCacheItemMetadata::from(meta))
else {
return Err(pingora::Error::new_str("failed to serialize cache meta"));
};
tokio::fs::write(main_path.join(metadata_file), serialized_metadata)
.await
.ok();
Ok(true)
}
fn as_any(&self) -> &(dyn Any + Send + Sync + 'static) {
self
}
}