use super::LOG_TARGET;
use super::{Error, PAGE_SIZE, Result};
use crate::get_file_backends;
use async_trait::async_trait;
use bytes::{Buf, BufMut, Bytes, BytesMut};
use pingap_core::BackgroundTask;
use pingap_core::Error as ServiceError;
use pingora::cache::key::{CacheHashKey, CompactCacheKey};
use pingora::cache::storage::MissFinishType;
use pingora::cache::storage::{HandleHit, HandleMiss};
use pingora::cache::trace::SpanHandle;
use pingora::cache::{
CacheKey, CacheMeta, HitHandler, MissHandler, PurgeType, Storage,
};
use std::any::Any;
use std::sync::Arc;
use std::time::Duration;
use std::time::SystemTime;
use tracing::info;
type BinaryMeta = (Vec<u8>, Vec<u8>);
#[derive(Debug, Clone, Default, PartialEq)]
pub struct CacheObject {
pub meta: BinaryMeta,
pub body: Bytes,
}
static MAX_OBJECT_CACHE_SIZE: usize = 10 * 1024 * PAGE_SIZE;
impl CacheObject {
pub fn get_weight(&self) -> u16 {
let size = self.body.len() + self.meta.0.len() + self.meta.1.len();
if size <= PAGE_SIZE {
return 1;
}
if size >= MAX_OBJECT_CACHE_SIZE {
return u16::MAX;
}
(size / PAGE_SIZE) as u16
}
}
const META_SIZE_LENGTH: usize = 8;
impl From<Bytes> for CacheObject {
fn from(value: Bytes) -> Self {
if value.len() < META_SIZE_LENGTH {
return Self::default();
}
let mut data = value;
let meta0_size = data.get_u32() as usize;
let meta1_size = data.get_u32() as usize;
let meta0 = data.split_to(meta0_size).to_vec();
let meta1 = data.split_to(meta1_size).to_vec();
Self {
meta: (meta0, meta1),
body: data,
}
}
}
impl From<CacheObject> for Bytes {
fn from(value: CacheObject) -> Self {
let meta_size =
value.meta.0.len() + value.meta.1.len() + META_SIZE_LENGTH;
let mut buf = BytesMut::with_capacity(value.body.len() + meta_size);
let meta0_size = value.meta.0.len() as u32;
let meta1_size = value.meta.1.len() as u32;
buf.put_u32(meta0_size);
buf.put_u32(meta1_size);
buf.extend(value.meta.0);
buf.extend(value.meta.1);
buf.extend(value.body.iter());
buf.into()
}
}
#[derive(Debug)]
pub struct HttpCacheStats {
pub reading: u32,
pub writing: u32,
}
#[derive(Debug, Clone)]
pub struct HttpCacheClearStats {
pub success: i32,
pub fail: i32,
pub description: String,
}
#[async_trait]
pub trait HttpCacheStorage: Sync + Send {
async fn get(
&self,
key: &str,
namespace: &[u8],
) -> Result<Option<CacheObject>>;
async fn put(
&self,
key: &str,
namespace: &[u8],
data: CacheObject,
) -> Result<()>;
async fn remove(
&self,
_key: &str,
_namespace: &[u8],
) -> Result<Option<CacheObject>> {
Ok(None)
}
async fn clear(
&self,
_access_before: std::time::SystemTime,
) -> Result<HttpCacheClearStats> {
Ok(HttpCacheClearStats {
success: -1,
fail: -1,
description: "".to_string(),
})
}
fn stats(&self) -> Option<HttpCacheStats> {
None
}
fn inactive(&self) -> Option<Duration> {
None
}
}
async fn do_file_storage_clear(count: u32) -> Result<bool, ServiceError> {
let offset = 60;
if !count.is_multiple_of(offset) {
return Ok(false);
}
let backends = get_file_backends();
for backend in backends {
let cache = &backend.cache;
let Some(inactive_duration) = cache.inactive() else {
continue;
};
let Some(access_before) =
SystemTime::now().checked_sub(inactive_duration)
else {
return Ok(false);
};
let Ok(stats) = cache.clear(access_before).await else {
return Ok(true);
};
info!(
target: LOG_TARGET,
success = stats.success,
fail = stats.fail,
description = stats.description,
);
}
Ok(true)
}
struct StorageClearTask {}
#[async_trait]
impl BackgroundTask for StorageClearTask {
async fn execute(&self, count: u32) -> Result<bool, ServiceError> {
do_file_storage_clear(count).await?;
Ok(true)
}
}
pub fn new_storage_clear_service() -> Option<Box<dyn BackgroundTask>> {
Some(Box::new(StorageClearTask {}))
}
pub struct HttpCache {
pub directory: Option<String>,
pub cache: Arc<dyn HttpCacheStorage>,
pub max_size: u64,
}
impl HttpCache {
#[inline]
pub fn stats(&self) -> Option<HttpCacheStats> {
self.cache.stats()
}
pub fn max_size(&self) -> u64 {
self.max_size
}
}
pub struct CompleteHit {
body: Bytes,
done: bool,
range_start: usize,
range_end: usize,
}
impl CompleteHit {
fn get(&mut self) -> Option<Bytes> {
if self.done {
None
} else {
self.done = true;
Some(self.body.slice(self.range_start..self.range_end))
}
}
fn seek(&mut self, start: usize, end: Option<usize>) -> Result<()> {
if start >= self.body.len() {
return Err(Error::Invalid {
message: format!(
"seek start out of range {start} >= {}",
self.body.len()
),
});
}
self.range_start = start;
if let Some(end) = end {
self.range_end = std::cmp::min(self.body.len(), end);
}
self.done = false;
Ok(())
}
}
#[async_trait]
impl HandleHit for CompleteHit {
async fn read_body(&mut self) -> pingora::Result<Option<Bytes>> {
Ok(self.get())
}
async fn finish(
self: Box<Self>, _storage: &'static (dyn Storage + Sync),
_key: &CacheKey,
_trace: &SpanHandle,
) -> pingora::Result<()> {
Ok(())
}
fn can_seek(&self) -> bool {
true
}
fn seek(
&mut self,
start: usize,
end: Option<usize>,
) -> pingora::Result<()> {
self.seek(start, end)?;
Ok(())
}
fn as_any(&self) -> &(dyn Any + Send + Sync) {
self
}
fn as_any_mut(&mut self) -> &mut (dyn Any + Send + Sync) {
self
}
}
pub struct ObjectMissHandler {
meta: BinaryMeta,
body: BytesMut,
key: String,
primary_key: String,
namespace: Vec<u8>,
cache: Arc<dyn HttpCacheStorage>,
}
#[async_trait]
impl HandleMiss for ObjectMissHandler {
async fn write_body(
&mut self,
data: bytes::Bytes,
_eof: bool,
) -> pingora::Result<()> {
self.body.extend(&data);
Ok(())
}
async fn finish(self: Box<Self>) -> pingora::Result<MissFinishType> {
let size = self.body.len(); info!(
target: LOG_TARGET,
key = self.key,
primary_key = self.primary_key,
namespace = std::str::from_utf8(&self.namespace).ok(),
size,
"put data to cache"
);
let _ = self
.cache
.put(
&self.key,
&self.namespace,
CacheObject {
meta: self.meta,
body: self.body.into(),
},
)
.await?;
Ok(MissFinishType::Created(size))
}
}
#[async_trait]
impl Storage for HttpCache {
async fn lookup(
&'static self,
key: &CacheKey,
_trace: &SpanHandle,
) -> pingora::Result<Option<(CacheMeta, HitHandler)>> {
let namespace = key.namespace();
let hash = key.combined();
if let Some(obj) = self.cache.get(&hash, namespace).await? {
let meta = CacheMeta::deserialize(&obj.meta.0, &obj.meta.1)?;
let size = obj.body.len();
let hit_handler = CompleteHit {
body: obj.body,
done: false,
range_start: 0,
range_end: size,
};
Ok(Some((meta, Box::new(hit_handler))))
} else {
Ok(None)
}
}
async fn get_miss_handler(
&'static self,
key: &CacheKey,
meta: &CacheMeta,
_trace: &SpanHandle,
) -> pingora::Result<MissHandler> {
let capacity = 5 * 1024;
let size = if let Some(content_length) =
meta.headers().get(http::header::CONTENT_LENGTH)
{
content_length
.to_str()
.unwrap_or_default()
.parse::<usize>()
.unwrap_or(capacity)
} else {
capacity
};
let hash = key.combined();
let meta = meta.serialize()?;
let miss_handler = ObjectMissHandler {
meta,
key: hash,
primary_key: key.primary_key_str().unwrap_or_default().to_string(),
namespace: key.namespace().to_vec(),
cache: self.cache.clone(),
body: BytesMut::with_capacity(size),
};
Ok(Box::new(miss_handler))
}
async fn purge(
&'static self,
key: &CompactCacheKey,
_type: PurgeType,
_trace: &SpanHandle,
) -> pingora::Result<bool> {
let hash = key.combined();
let cache_removed =
if let Ok(result) = self.cache.remove(&hash, b"").await {
result.is_some()
} else {
false
};
Ok(cache_removed)
}
async fn update_meta(
&'static self,
key: &CacheKey,
meta: &CacheMeta,
_trace: &SpanHandle,
) -> pingora::Result<bool> {
let namespace = key.namespace();
let hash = key.combined();
if let Some(mut obj) = self.cache.get(&hash, namespace).await? {
obj.meta = meta.serialize()?;
let _ = self.cache.put(&hash, namespace, obj).await?;
Ok(true)
} else {
Err(Error::Invalid {
message: "no meta found".to_string(),
}
.into())
}
}
fn support_streaming_partial_write(&self) -> bool {
false
}
fn as_any(&self) -> &(dyn Any + Send + Sync) {
self
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::tiny::{CacheMode, TinyUfoCache};
use bytes::{Bytes, BytesMut};
use pingora::cache::storage::{HitHandler, MissHandler};
use pretty_assertions::assert_eq;
use std::sync::Arc;
#[tokio::test]
async fn test_complete_hit() {
let body = Bytes::from_static(b"Hello World!");
let size = body.len();
let hit = CompleteHit {
body,
done: false,
range_start: 0,
range_end: size,
};
let mut handle: HitHandler = Box::new(hit);
let body = handle.read_body().await.unwrap();
assert_eq!(true, body.is_some());
assert_eq!(b"Hello World!", body.unwrap().as_ref());
handle.seek(1, Some(size - 1)).unwrap();
let body = handle.read_body().await.unwrap();
assert_eq!(true, body.is_some());
assert_eq!(b"ello World", body.unwrap().as_ref());
}
#[tokio::test]
async fn test_object_miss_handler() {
let key = "key";
let cache = Arc::new(TinyUfoCache::new(CacheMode::Normal, 10, 10));
let obj = ObjectMissHandler {
meta: (b"Hello".to_vec(), b"World".to_vec()),
body: BytesMut::new(),
key: key.to_string(),
primary_key: "".to_string(),
namespace: b"".to_vec(),
cache: cache.clone(),
};
let mut handle: MissHandler = Box::new(obj);
handle
.write_body(Bytes::from_static(b"Hello World!"), true)
.await
.unwrap();
handle.finish().await.unwrap();
let data = cache.get(key, b"").await.unwrap().unwrap();
assert_eq!("Hello World!", std::str::from_utf8(&data.body).unwrap());
}
#[test]
fn test_cache_object_get_weight() {
let obj = CacheObject {
meta: (b"Hello".to_vec(), b"World".to_vec()),
body: Bytes::from_static(b"Hello World!"),
};
assert_eq!(1, obj.get_weight());
let obj = CacheObject {
meta: (b"Hello".to_vec(), b"World".to_vec()),
body: vec![0; PAGE_SIZE * 2].into(),
};
assert_eq!(2, obj.get_weight());
let obj = CacheObject {
meta: (b"Hello".to_vec(), b"World".to_vec()),
body: vec![0; MAX_OBJECT_CACHE_SIZE + 1].into(),
};
assert_eq!(u16::MAX, obj.get_weight());
}
}