use std::{any::Any, collections::HashMap, path::PathBuf, time::SystemTime};
use async_trait::async_trait;
use http::StatusCode;
use pingora_cache::{
key::CompactCacheKey,
storage::{HandleHit, HandleMiss},
trace::SpanHandle,
CacheKey, CacheMeta, HitHandler, MissHandler, Storage,
};
use pingora::{http::ResponseHeader, Result};
use serde::{Deserialize, Serialize};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
pub struct DiskCache {
directory: PathBuf,
}
impl DiskCache {
pub fn new() -> Self {
DiskCache {
directory: PathBuf::from("./tmp"),
}
}
}
#[derive(Serialize, Deserialize)]
pub struct DiskCacheItemMeta {
pub created_at: SystemTime,
pub fresh_until: SystemTime,
pub stale_while_revalidate_sec: u32,
pub stale_if_error_sec: u32,
pub headers: HashMap<String, String>,
}
impl From<&CacheMeta> for DiskCacheItemMeta {
fn from(meta: &CacheMeta) -> Self {
DiskCacheItemMeta {
created_at: meta.created(),
fresh_until: meta.fresh_until(),
stale_while_revalidate_sec: meta.stale_while_revalidate_sec(),
stale_if_error_sec: meta.stale_if_error_sec(),
headers: meta
.headers()
.into_iter()
.map(|(k, v)| (k.to_string(), v.to_str().unwrap().to_string()))
.collect(),
}
}
}
fn convert_headers(headers: HashMap<String, String>) -> ResponseHeader {
let mut res_headers = ResponseHeader::build_no_case(StatusCode::OK, None).unwrap();
for (k, v) in headers {
res_headers.insert_header(k, v).ok();
}
res_headers
}
#[async_trait]
impl Storage for DiskCache {
async fn lookup(
&'static self,
key: &CacheKey,
_: &SpanHandle,
) -> Result<Option<(CacheMeta, HitHandler)>> {
tracing::debug!("looking up cache for {key:?}");
let namespace = key.namespace();
let primary_key = key.primary_key();
let main_path = self.directory.join(namespace);
let metadata_file = format!("{primary_key}.metadata");
let cache_file = format!("{primary_key}.cache");
let Ok(body) = tokio::fs::read(main_path.join(metadata_file)).await else {
return Ok(None);
};
let Ok(meta) = serde_json::from_slice::<DiskCacheItemMeta>(&body) else {
return Ok(None);
};
let Ok(file_stream) = tokio::fs::File::open(main_path.join(cache_file)).await else {
return Ok(None);
};
tracing::debug!("found cache for {key:?}");
Ok(Some((
CacheMeta::new(
meta.fresh_until,
meta.created_at,
meta.stale_while_revalidate_sec,
meta.stale_if_error_sec,
convert_headers(meta.headers),
),
Box::new(DiskCacheHitHandler::new(file_stream)),
)))
}
async fn get_miss_handler(
&'static self,
key: &CacheKey,
meta: &CacheMeta,
_: &SpanHandle,
) -> Result<MissHandler> {
tracing::debug!("getting miss handler for {key:?}");
let primary_key = key.primary_key();
let main_path = PathBuf::from("./tmp").join(key.namespace());
let metadata_file = format!("{primary_key}.metadata");
if let Err(err) = tokio::fs::create_dir_all(&main_path).await {
tracing::error!("failed to create directory {main_path:?}: {err}");
return Err(pingora::Error::new_str("failed to create directory"));
}
let Ok(serialized_metadata) =
serde_json::to_vec::<DiskCacheItemMeta>(&DiskCacheItemMeta::from(meta))
else {
return Err(pingora::Error::new_str("failed to serialize cache meta"));
};
tokio::fs::write(main_path.join(metadata_file), serialized_metadata)
.await
.ok();
Ok(Box::new(DiskCacheMissHandler::new(
key.to_owned(),
DiskCacheItemMeta::from(meta),
self.directory.clone(),
)))
}
async fn purge(&'static self, _: &CompactCacheKey, _: &SpanHandle) -> Result<bool> {
Ok(true)
}
async fn update_meta(
&'static self,
key: &CacheKey,
meta: &CacheMeta,
_: &SpanHandle,
) -> Result<bool> {
let namespace = key.namespace();
let primary_key = key.primary_key();
let main_path = self.directory.join(namespace);
let metadata_file = format!("{primary_key}.metadata");
let Ok(serialized_metadata) =
serde_json::to_vec::<DiskCacheItemMeta>(&DiskCacheItemMeta::from(meta))
else {
return Err(pingora::Error::new_str("failed to serialize cache meta"));
};
tokio::fs::write(main_path.join(metadata_file), serialized_metadata)
.await
.ok();
Ok(true)
}
fn as_any(&self) -> &(dyn Any + Send + Sync + 'static) {
self
}
}
pub struct DiskCacheHitHandler {
target: tokio::fs::File,
}
impl DiskCacheHitHandler {
pub fn new(target: tokio::fs::File) -> Self {
DiskCacheHitHandler { target }
}
}
#[async_trait]
impl HandleHit for DiskCacheHitHandler {
async fn read_body(&mut self) -> Result<Option<bytes::Bytes>> {
let mut buffer = [0; 4096];
let Ok(bytes_read) = self.target.read(&mut buffer).await else {
return Err(pingora::Error::new_str("failed to read from cache"));
};
if bytes_read == 0 {
return Ok(None);
}
Ok(Some(bytes::Bytes::copy_from_slice(&buffer)))
}
async fn finish(
self: Box<Self>, _: &'static (dyn Storage + Sync),
_: &CacheKey,
_: &SpanHandle,
) -> Result<()> {
Ok(())
}
fn can_seek(&self) -> bool {
false
}
fn seek(&mut self, _: usize, _: Option<usize>) -> Result<()> {
Ok(())
}
fn as_any(&self) -> &(dyn Any + Send + Sync) {
self
}
}
pub struct DiskCacheMissHandler {
directory: PathBuf,
key: CacheKey,
_meta: DiskCacheItemMeta,
}
impl DiskCacheMissHandler {
pub fn new(key: CacheKey, meta: DiskCacheItemMeta, directory: PathBuf) -> DiskCacheMissHandler {
DiskCacheMissHandler {
key,
_meta: meta,
directory,
}
}
}
#[async_trait]
impl HandleMiss for DiskCacheMissHandler {
async fn write_body(&mut self, data: bytes::Bytes, end: bool) -> Result<()> {
if end {
return Ok(());
}
let primary_key = self.key.primary_key();
let main_path = self.directory.join(self.key.namespace());
let cache_file = format!("{primary_key}.cache");
let Ok(mut file) = tokio::fs::File::create(&main_path.join(cache_file)).await else {
return Err(pingora::Error::new_str("failed to create cache file"));
};
if file.write(&data).await.is_ok() {
return Ok(());
}
Ok(())
}
async fn finish(
self: Box<Self>, ) -> Result<usize> {
Ok(0)
}
}