use std::{
any::Any,
io::{BufReader, Read},
path::{Path, PathBuf},
sync::Arc,
};
use async_trait::async_trait;
use pingora_cache::{
storage::{HandleHit, HandleMiss},
trace::SpanHandle,
CacheKey, Storage,
};
use pingora::Result;
use tokio::{fs::OpenOptions, io::AsyncWriteExt};
use crate::cache::disk::storage::DISK_MEMORY_CACHE;
use super::meta::DiskCacheItemMetadata;
pub struct DiskCacheHitHandler {
target: BufReader<std::fs::File>,
path: PathBuf,
meta: DiskCacheItemMetadata,
finished_buffer: bytes::BytesMut,
}
impl DiskCacheHitHandler {
pub fn new(
target: BufReader<std::fs::File>,
path: PathBuf,
meta: DiskCacheItemMetadata,
) -> Self {
DiskCacheHitHandler {
target,
path,
meta,
finished_buffer: bytes::BytesMut::new(),
}
}
}
#[async_trait]
impl HandleHit for DiskCacheHitHandler {
async fn read_body(&mut self) -> Result<Option<bytes::Bytes>> {
let mut buffer = vec![0; 32_000];
let Ok(bytes_read) = self.target.read(&mut buffer) else {
tracing::error!("failed to read completely from cache: {:?}", self.path);
return Ok(None);
};
tracing::debug!("read from cache: {bytes_read}");
if bytes_read == 0 {
return Ok(None);
}
let slice = bytes::Bytes::copy_from_slice(&buffer[..bytes_read]);
self.finished_buffer
.extend_from_slice(&buffer[..bytes_read]);
Ok(Some(slice))
}
async fn finish(
self: Box<Self>, _storage: &'static (dyn Storage + Sync),
cache_key: &CacheKey,
_: &SpanHandle,
) -> Result<()> {
let cached_data_key = format!("{}-{}", cache_key.namespace(), cache_key.primary_key());
if DISK_MEMORY_CACHE.load().contains_key(&cached_data_key) {
tracing::debug!("skipping write, cach already contains data for {cache_key:?}");
return Ok(());
}
DISK_MEMORY_CACHE.rcu(|p| {
let mut map = (**p).clone();
map.insert(
cached_data_key.clone(),
(self.meta.clone(), self.finished_buffer.clone().freeze()),
);
Arc::new(map)
});
tracing::debug!("wrote to memory cache: {:?}", self.path);
Ok(())
}
fn can_seek(&self) -> bool {
false
}
fn seek(&mut self, _start: usize, _end: Option<usize>) -> Result<()> {
Ok(())
}
fn as_any(&self) -> &(dyn Any + Send + Sync) {
self
}
}
pub struct DiskCacheMissHandler {
main_path: PathBuf,
key: CacheKey,
_meta: DiskCacheItemMetadata,
}
impl DiskCacheMissHandler {
pub fn new(
key: CacheKey,
meta: DiskCacheItemMetadata,
directory: PathBuf,
) -> DiskCacheMissHandler {
DiskCacheMissHandler {
key,
_meta: meta,
main_path: directory,
}
}
async fn write_to_file<P: AsRef<Path>>(
path: P,
content: &[u8],
) -> std::io::Result<tokio::fs::File> {
let mut file = OpenOptions::new()
.create(true) .append(true)
.open(path)
.await?;
file.write_all(content).await?;
Ok(file)
}
}
#[async_trait]
impl HandleMiss for DiskCacheMissHandler {
async fn write_body(&mut self, data: bytes::Bytes, end: bool) -> Result<()> {
let primary_key = self.key.primary_key();
let main_path = self.main_path.clone();
let cache_file = format!("{primary_key}.cache");
let Ok(_f) = Self::write_to_file(&main_path.join(&cache_file), &data).await else {
tracing::error!(
"failed to write to cache file: {:?}",
main_path.join(cache_file)
);
return Err(pingora::Error::new_str("failed to write to cache file"));
};
if end {
return Ok(());
}
Ok(())
}
async fn finish(
self: Box<Self>, ) -> pingora::Result<usize> {
Ok(0)
}
}
pub struct DiskCacheHitHandlerInMemory {
target: bytes::buf::Reader<bytes::Bytes>,
}
impl DiskCacheHitHandlerInMemory {
pub fn new(target: bytes::buf::Reader<bytes::Bytes>) -> Self {
DiskCacheHitHandlerInMemory { target }
}
}
#[async_trait]
impl HandleHit for DiskCacheHitHandlerInMemory {
async fn read_body(&mut self) -> Result<Option<bytes::Bytes>> {
let mut buffer = vec![0; 128_000];
let Ok(bytes_read) = self.target.read(&mut buffer) else {
tracing::error!("failed to read completely from MEMORY cache");
return Ok(None);
};
tracing::debug!("read from cache: {bytes_read}");
if bytes_read == 0 {
return Ok(None);
}
let slice = bytes::Bytes::copy_from_slice(&buffer[..bytes_read]);
Ok(Some(slice))
}
async fn finish(
self: Box<Self>, _storage: &'static (dyn Storage + Sync),
_cache_key: &CacheKey,
_: &SpanHandle,
) -> Result<()> {
Ok(())
}
fn can_seek(&self) -> bool {
false
}
fn seek(&mut self, _start: usize, _end: Option<usize>) -> Result<()> {
Ok(())
}
fn as_any(&self) -> &(dyn Any + Send + Sync) {
self
}
}