use std::{any::Any, path::PathBuf};
use async_trait::async_trait;
use pingora_cache::{
key::CompactCacheKey, trace::SpanHandle, CacheKey, CacheMeta, HitHandler, MissHandler, Storage,
};
use pingora::Result;
use crate::{
cache::disk::{
handlers::{DiskCacheHitHandler, DiskCacheMissHandler},
meta::DiskCacheItemMetadata,
},
stores,
};
pub struct DiskCache {
pub directory: PathBuf,
}
impl DiskCache {
pub fn new() -> Self {
DiskCache {
directory: PathBuf::from("/tmp"),
}
}
pub fn get_directory_for(&self, key: &str) -> PathBuf {
let Some(path) = stores::get_cache_routing_by_key(key) else {
return self.directory.join(key);
};
PathBuf::from(path).join(key)
}
}
#[async_trait]
impl Storage for DiskCache {
fn support_streaming_partial_write(&self) -> bool {
false
}
async fn lookup(
&'static self,
key: &CacheKey,
_: &SpanHandle,
) -> Result<Option<(CacheMeta, HitHandler)>> {
tracing::debug!("looking up cache for {key:?}");
let namespace = key.namespace();
let primary_key = key.primary_key();
let main_path = self.get_directory_for(namespace);
let metadata_file = format!("{primary_key}.metadata");
let cache_file = format!("{primary_key}.cache");
let Ok(body) = std::fs::read(main_path.join(metadata_file)) else {
return Ok(None);
};
let Ok(meta) = serde_json::from_slice::<DiskCacheItemMetadata>(&body) else {
return Ok(None);
};
let file_path = main_path.join(cache_file);
let Ok(file_stream) = std::fs::OpenOptions::new().read(true).open(&file_path) else {
return Ok(None);
};
tracing::debug!("found cache for {key:?}");
let buf_reader = std::io::BufReader::new(file_stream);
Ok(Some((
CacheMeta::new(
meta.fresh_until,
meta.created_at,
meta.stale_while_revalidate_sec,
meta.stale_if_error_sec,
DiskCacheItemMetadata::convert_headers(&meta),
),
Box::new(DiskCacheHitHandler::new(buf_reader, file_path)),
)))
}
async fn get_miss_handler(
&'static self,
key: &CacheKey,
meta: &CacheMeta,
_: &SpanHandle,
) -> Result<MissHandler> {
tracing::debug!("getting miss handler for {key:?}");
let primary_key = key.primary_key();
let main_path = self.get_directory_for(key.namespace());
let metadata_file = format!("{primary_key}.metadata");
if let Err(err) = tokio::fs::create_dir_all(&main_path).await {
tracing::error!("failed to create directory {main_path:?}: {err}");
return Err(pingora::Error::new_str("failed to create directory"));
}
let Ok(serialized_metadata) =
serde_json::to_vec::<DiskCacheItemMetadata>(&DiskCacheItemMetadata::from(meta))
else {
return Err(pingora::Error::new_str("failed to serialize cache meta"));
};
tokio::fs::write(main_path.join(metadata_file), serialized_metadata)
.await
.ok();
Ok(Box::new(DiskCacheMissHandler::new(
key.to_owned(),
DiskCacheItemMetadata::from(meta),
main_path,
)))
}
async fn purge(&'static self, _: &CompactCacheKey, _: &SpanHandle) -> Result<bool> {
Ok(true)
}
async fn update_meta(
&'static self,
key: &CacheKey,
meta: &CacheMeta,
_: &SpanHandle,
) -> Result<bool> {
let namespace = key.namespace();
let primary_key = key.primary_key();
let main_path = self.get_directory_for(namespace);
let metadata_file = format!("{primary_key}.metadata");
let Ok(serialized_metadata) =
serde_json::to_vec::<DiskCacheItemMetadata>(&DiskCacheItemMetadata::from(meta))
else {
return Err(pingora::Error::new_str("failed to serialize cache meta"));
};
tokio::fs::write(main_path.join(metadata_file), serialized_metadata)
.await
.ok();
Ok(true)
}
fn as_any(&self) -> &(dyn Any + Send + Sync + 'static) {
self
}
}