use std::{
any::Any,
collections::HashMap,
io::Read,
path::{Path, PathBuf},
time::SystemTime,
};
use async_trait::async_trait;
use http::StatusCode;
use pingora_cache::{
key::CompactCacheKey,
storage::{HandleHit, HandleMiss},
trace::SpanHandle,
CacheKey, CacheMeta, HitHandler, MissHandler, Storage,
};
use pingora::{http::ResponseHeader, Result};
use serde::{Deserialize, Serialize};
use tokio::{fs::OpenOptions, io::AsyncWriteExt};
use crate::stores;
pub struct DiskCache {
pub directory: PathBuf,
}
impl DiskCache {
pub fn new() -> Self {
DiskCache {
directory: PathBuf::from("/tmp"),
}
}
pub fn get_directory_for(&self, key: &str) -> PathBuf {
let Some(path) = stores::get_cache_routing_by_key(key) else {
return self.directory.join(key);
};
PathBuf::from(path).join(key)
}
}
#[derive(Serialize, Deserialize)]
pub struct DiskCacheItemMeta {
pub status: u16,
pub created_at: SystemTime,
pub fresh_until: SystemTime,
pub stale_while_revalidate_sec: u32,
pub stale_if_error_sec: u32,
pub headers: HashMap<String, String>,
}
impl From<&CacheMeta> for DiskCacheItemMeta {
fn from(meta: &CacheMeta) -> Self {
DiskCacheItemMeta {
status: meta.response_header().status.as_u16(),
created_at: meta.created(),
fresh_until: meta.fresh_until(),
stale_while_revalidate_sec: meta.stale_while_revalidate_sec(),
stale_if_error_sec: meta.stale_if_error_sec(),
headers: meta
.headers()
.into_iter()
.map(|(k, v)| (k.to_string(), v.to_str().unwrap().to_string()))
.collect(),
}
}
}
fn convert_headers(headers: HashMap<String, String>, status: u16) -> ResponseHeader {
let status_code = StatusCode::from_u16(status).unwrap_or(StatusCode::OK);
let mut res_headers = ResponseHeader::build(status_code, None).unwrap();
for (k, v) in headers {
res_headers.insert_header(k, v).ok();
}
res_headers
}
#[async_trait]
impl Storage for DiskCache {
fn support_streaming_partial_write(&self) -> bool {
false
}
async fn lookup(
&'static self,
key: &CacheKey,
_: &SpanHandle,
) -> Result<Option<(CacheMeta, HitHandler)>> {
tracing::debug!("looking up cache for {key:?}");
let namespace = key.namespace();
let primary_key = key.primary_key();
let main_path = self.get_directory_for(namespace);
let metadata_file = format!("{primary_key}.metadata");
let cache_file = format!("{primary_key}.cache");
let Ok(body) = std::fs::read(main_path.join(metadata_file)) else {
return Ok(None);
};
let Ok(meta) = serde_json::from_slice::<DiskCacheItemMeta>(&body) else {
return Ok(None);
};
let file_path = main_path.join(cache_file);
let Ok(file_stream) = std::fs::OpenOptions::new().read(true).open(&file_path) else {
return Ok(None);
};
tracing::debug!("found cache for {key:?}");
Ok(Some((
CacheMeta::new(
meta.fresh_until,
meta.created_at,
meta.stale_while_revalidate_sec,
meta.stale_if_error_sec,
convert_headers(meta.headers, meta.status),
),
Box::new(DiskCacheHitHandler::new(file_stream, file_path)),
)))
}
async fn get_miss_handler(
&'static self,
key: &CacheKey,
meta: &CacheMeta,
_: &SpanHandle,
) -> Result<MissHandler> {
tracing::debug!("getting miss handler for {key:?}");
let primary_key = key.primary_key();
let main_path = self.get_directory_for(key.namespace());
let metadata_file = format!("{primary_key}.metadata");
if let Err(err) = tokio::fs::create_dir_all(&main_path).await {
tracing::error!("failed to create directory {main_path:?}: {err}");
return Err(pingora::Error::new_str("failed to create directory"));
}
let Ok(serialized_metadata) =
serde_json::to_vec::<DiskCacheItemMeta>(&DiskCacheItemMeta::from(meta))
else {
return Err(pingora::Error::new_str("failed to serialize cache meta"));
};
tokio::fs::write(main_path.join(metadata_file), serialized_metadata)
.await
.ok();
Ok(Box::new(DiskCacheMissHandler::new(
key.to_owned(),
DiskCacheItemMeta::from(meta),
main_path,
)))
}
async fn purge(&'static self, _: &CompactCacheKey, _: &SpanHandle) -> Result<bool> {
Ok(true)
}
async fn update_meta(
&'static self,
key: &CacheKey,
meta: &CacheMeta,
_: &SpanHandle,
) -> Result<bool> {
let namespace = key.namespace();
let primary_key = key.primary_key();
let main_path = self.get_directory_for(namespace);
let metadata_file = format!("{primary_key}.metadata");
let Ok(serialized_metadata) =
serde_json::to_vec::<DiskCacheItemMeta>(&DiskCacheItemMeta::from(meta))
else {
return Err(pingora::Error::new_str("failed to serialize cache meta"));
};
tokio::fs::write(main_path.join(metadata_file), serialized_metadata)
.await
.ok();
Ok(true)
}
fn as_any(&self) -> &(dyn Any + Send + Sync + 'static) {
self
}
}
pub struct DiskCacheHitHandler {
target: std::fs::File,
path: PathBuf,
}
impl DiskCacheHitHandler {
pub fn new(target: std::fs::File, path: PathBuf) -> Self {
DiskCacheHitHandler { target, path }
}
}
#[async_trait]
impl HandleHit for DiskCacheHitHandler {
async fn read_body(&mut self) -> Result<Option<bytes::Bytes>> {
let mut buffer = [0; 32_000];
let Ok(bytes_read) = self.target.by_ref().read(&mut buffer) else {
tracing::error!("failed to read completely from cache: {:?}", self.path);
return Ok(None);
};
tracing::debug!("read from cache: {bytes_read}");
if bytes_read == 0 {
return Ok(None);
}
Ok(Some(bytes::Bytes::copy_from_slice(&buffer)))
}
async fn finish(
self: Box<Self>, _storage: &'static (dyn Storage + Sync),
_cache_key: &CacheKey,
_: &SpanHandle,
) -> Result<()> {
Ok(())
}
fn can_seek(&self) -> bool {
false
}
fn seek(&mut self, _start: usize, _end: Option<usize>) -> Result<()> {
Ok(())
}
fn as_any(&self) -> &(dyn Any + Send + Sync) {
self
}
}
pub struct DiskCacheMissHandler {
main_path: PathBuf,
key: CacheKey,
_meta: DiskCacheItemMeta,
}
impl DiskCacheMissHandler {
pub fn new(key: CacheKey, meta: DiskCacheItemMeta, directory: PathBuf) -> DiskCacheMissHandler {
DiskCacheMissHandler {
key,
_meta: meta,
main_path: directory,
}
}
}
async fn write_to_file<P: AsRef<Path>>(
path: P,
content: &[u8],
) -> std::io::Result<tokio::fs::File> {
let mut file = OpenOptions::new()
.create(true) .append(true)
.open(path)
.await?;
file.write_all(content).await?;
Ok(file)
}
#[async_trait]
impl HandleMiss for DiskCacheMissHandler {
async fn write_body(&mut self, data: bytes::Bytes, end: bool) -> Result<()> {
let primary_key = self.key.primary_key();
let main_path = self.main_path.clone();
let cache_file = format!("{primary_key}.cache");
let Ok(_f) = write_to_file(&main_path.join(&cache_file), &data).await else {
tracing::error!(
"failed to write to cache file: {:?}",
main_path.join(cache_file)
);
return Err(pingora::Error::new_str("failed to write to cache file"));
};
if end {
return Ok(());
}
Ok(())
}
async fn finish(
self: Box<Self>, ) -> Result<usize> {
Ok(0)
}
}