use super::http_cache::{
CacheObject, HttpCacheClearStats, HttpCacheStats, HttpCacheStorage,
};
#[cfg(feature = "tracing")]
use super::{CACHE_READING_TIME, CACHE_WRITING_TIME};
use super::{Error, LOG_TARGET, PAGE_SIZE, Result};
use async_trait::async_trait;
use bytes::Bytes;
use chrono::{DateTime, Local};
use path_absolutize::*;
use pingap_core::TinyUfo;
#[cfg(feature = "tracing")]
use prometheus::Histogram;
use scopeguard::defer;
use serde::{Deserialize, Serialize};
use std::path::Path;
use std::path::PathBuf;
use std::sync::atomic::{AtomicU32, Ordering};
use std::time::{Duration, SystemTime};
use tokio::fs;
use tracing::{debug, error, info};
use walkdir::WalkDir;
pub struct FileCache {
pub directory: String,
reading: AtomicU32,
reading_max: u32,
#[cfg(feature = "tracing")]
read_time: Box<Histogram>,
writing: AtomicU32,
writing_max: u32,
#[cfg(feature = "tracing")]
write_time: Box<Histogram>,
cache: Option<TinyUfo<String, CacheObject>>,
cache_file_max_weight: u16,
cache_inactive: Duration,
levels: Vec<u32>,
}
fn split_levels<'de, D>(deserializer: D) -> Result<Vec<u32>, D::Error>
where
D: serde::Deserializer<'de>,
{
let s: String = String::deserialize(deserializer)?;
let mut valid = true;
let mut levels = vec![];
for item in s.split(':') {
let Ok(value) = item.parse::<u32>() else {
valid = false;
break;
};
if value > 3 {
valid = false;
break;
}
levels.push(value);
}
if levels.len() > 2 {
return Ok(vec![]);
}
if valid {
return Ok(levels);
}
Ok(vec![])
}
#[derive(Debug, PartialEq, Deserialize, Serialize, Default)]
struct FileCacheParams {
#[serde(default)]
directory: String,
#[serde(default)]
#[serde(with = "humantime_serde")]
inactive: Option<Duration>,
reading_max: Option<u32>,
writing_max: Option<u32>,
#[serde(default)]
cache_max: usize,
cache_file_max_weight: Option<usize>,
#[serde(default)]
#[serde(deserialize_with = "split_levels")]
levels: Vec<u32>,
}
impl TryFrom<&str> for FileCacheParams {
type Error = Error;
fn try_from(value: &str) -> Result<Self> {
let (dir, query) = value.split_once('?').unwrap_or((value, ""));
let mut params = if query.is_empty() {
FileCacheParams::default()
} else {
serde_qs::from_str(query).map_err(|e| Error::Invalid {
message: e.to_string(),
})?
};
params.directory = resolve_path(dir);
Ok(params)
}
}
fn resolve_path(path_str: &str) -> String {
if path_str.is_empty() {
return String::new();
}
let path = if let Some(stripped) = path_str.strip_prefix("~/") {
dirs::home_dir()
.map(|home| home.join(stripped))
.unwrap_or_else(|| PathBuf::from(path_str))
} else {
PathBuf::from(path_str)
};
path.absolutize().map_or_else(
|_| path.to_string_lossy().into_owned(),
|p| p.to_string_lossy().into_owned(),
)
}
impl FileCache {
pub fn new(dir: &str) -> Result<Self> {
let params = FileCacheParams::try_from(dir)?;
let path = Path::new(¶ms.directory);
if !path.exists() {
std::fs::create_dir_all(path)
.map_err(|e| Error::Io { source: e })?;
}
info!(
target: LOG_TARGET,
dir = params.directory,
levels = params
.levels
.iter()
.map(|v| v.to_string())
.collect::<Vec<String>>()
.join(":"),
reading_max = params.reading_max,
writing_max = params.writing_max,
cache_max = params.cache_max,
cache_file_max_weight = params.cache_file_max_weight,
"new file cache"
);
let mut cache = None;
if params.cache_max > 0 {
cache = Some(TinyUfo::new(
params.cache_max,
params.cache_max * PAGE_SIZE,
));
}
Ok(FileCache {
directory: params.directory,
cache_file_max_weight: params
.cache_file_max_weight
.unwrap_or(1024 * 1024 / PAGE_SIZE)
as u16,
reading: AtomicU32::new(0),
reading_max: params.reading_max.unwrap_or(10_000),
#[cfg(feature = "tracing")]
read_time: CACHE_READING_TIME.clone(),
writing: AtomicU32::new(0),
writing_max: params.writing_max.unwrap_or(1_000),
#[cfg(feature = "tracing")]
write_time: CACHE_WRITING_TIME.clone(),
cache,
cache_inactive: params
.inactive
.unwrap_or(Duration::from_secs(48 * 3600)),
levels: params.levels,
})
}
#[inline]
fn get_file_path(&self, key: &str, namespace: &str) -> std::path::PathBuf {
let mut path = Path::new(&self.directory).to_path_buf();
if !namespace.is_empty() {
path.push(namespace);
};
if self.levels.is_empty() {
path.push(key);
return path;
}
let mut current_len = key.len() - 1;
for level in self.levels.iter() {
let level = *level as usize;
if current_len > level {
path.push(&key[current_len - level..current_len]);
current_len -= level;
}
}
path.push(key);
path
}
}
#[cfg(feature = "tracing")]
#[inline]
fn elapsed_second(time: SystemTime) -> f64 {
time.elapsed().unwrap_or_default().as_millis() as f64 / 1000.0
}
#[async_trait]
impl HttpCacheStorage for FileCache {
async fn get(
&self,
key: &str,
namespace: &[u8],
) -> Result<Option<CacheObject>> {
if let Some(cache) = &self.cache
&& let Some(obj) = cache.get(&key.to_string())
{
debug!(
target: LOG_TARGET,
key, namespace, "get cache from tinyufo"
);
return Ok(Some(obj));
}
#[cfg(feature = "tracing")]
let start = SystemTime::now();
let namespace_str = std::str::from_utf8(namespace).unwrap_or_default();
let file = self.get_file_path(key, namespace_str);
let count = self.reading.fetch_add(1, Ordering::Relaxed);
defer!(self.reading.fetch_sub(1, Ordering::Relaxed););
if self.reading_max > 0 && count >= self.reading_max {
return Err(Error::OverQuota {
max: self.reading_max,
message: "too many reading".to_string(),
});
}
let result = fs::read(file).await;
#[cfg(feature = "tracing")]
self.read_time.observe(elapsed_second(start));
let obj = match result {
Ok(buf) if buf.len() >= 8 => {
Ok(Some(CacheObject::from(Bytes::from(buf))))
},
Ok(_) => Ok(None),
Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
Err(e) => Err(Error::Io { source: e }),
}?;
if let Some(cache) = &self.cache
&& let Some(obj) = &obj
{
let weight = obj.get_weight();
cache.put(key.to_string(), obj.clone(), weight);
}
debug!(
target: LOG_TARGET,
key,
namespace =
std::string::String::from_utf8_lossy(namespace).to_string(),
"get cache from file"
);
Ok(obj)
}
async fn put(
&self,
key: &str,
namespace: &[u8],
data: CacheObject,
) -> Result<()> {
if let Some(c) = &self.cache {
let weight = data.get_weight();
if weight < self.cache_file_max_weight {
debug!(
target: LOG_TARGET,
key, namespace, "put cache to tinyufo"
);
c.put(key.to_string(), data.clone(), weight);
}
}
#[cfg(feature = "tracing")]
let start = SystemTime::now();
let buf: Bytes = data.into();
let namespace_str = std::str::from_utf8(namespace).unwrap_or_default();
let file = self.get_file_path(key, namespace_str);
let count = self.writing.fetch_add(1, Ordering::Relaxed);
defer!(self.writing.fetch_sub(1, Ordering::Relaxed););
if self.writing_max > 0 && count >= self.writing_max {
return Err(Error::OverQuota {
max: self.writing_max,
message: "too many writing".to_string(),
});
}
if let Some(parent) = file.parent() {
fs::create_dir_all(parent)
.await
.map_err(|e| Error::Io { source: e })?;
}
let result = fs::write(file, buf).await;
#[cfg(feature = "tracing")]
self.write_time.observe(elapsed_second(start));
let _ = result.map_err(|e| Error::Io { source: e })?;
debug!(
target: LOG_TARGET,
key,
namespace =
std::string::String::from_utf8_lossy(namespace).to_string(),
"put cache to file"
);
Ok(())
}
async fn remove(
&self,
key: &str,
namespace: &[u8],
) -> Result<Option<CacheObject>> {
if let Some(c) = &self.cache {
debug!(
target: LOG_TARGET,
key, namespace, "remove cache from tinyufo"
);
c.remove(&key.to_string());
}
let file = self.get_file_path(
key,
std::string::String::from_utf8_lossy(namespace).as_ref(),
);
fs::remove_file(file)
.await
.map_err(|e| Error::Io { source: e })?;
debug!(
target: LOG_TARGET,
key, namespace, "remove cache from file"
);
Ok(None)
}
#[inline]
fn stats(&self) -> Option<HttpCacheStats> {
Some(HttpCacheStats {
reading: self.reading.load(Ordering::Relaxed),
writing: self.writing.load(Ordering::Relaxed),
})
}
async fn clear(
&self,
access_before: SystemTime,
) -> Result<HttpCacheClearStats> {
let mut success = 0;
let mut fail = 0;
let datetime_local: DateTime<Local> = access_before.into();
let description = format!(
"clear cache file, directory: {}, access before: {datetime_local}",
self.directory
);
for entry in WalkDir::new(&self.directory)
.into_iter()
.filter_map(|item| item.ok())
.filter(|item| !item.path().is_dir())
{
let Ok(metadata) = entry.metadata() else {
continue;
};
let Ok(accessed) = metadata.accessed() else {
continue;
};
if accessed > access_before {
continue;
}
let path = entry.path();
let file = path.to_string_lossy().to_string();
match fs::remove_file(path).await {
Ok(()) => {
info!(
target: LOG_TARGET,
file, "remove cache file success"
);
success += 1;
},
Err(e) => {
fail += 1;
error!(
target: LOG_TARGET,
error = %e,
file,
"remove cache file fail"
);
},
};
}
Ok(HttpCacheClearStats {
success,
fail,
description,
})
}
fn inactive(&self) -> Option<Duration> {
Some(self.cache_inactive)
}
}
#[cfg(test)]
mod tests {
use super::*;
use bytes::Bytes;
use pretty_assertions::assert_eq;
use std::fs::File;
use std::time::{Duration, SystemTime};
use tempfile::{TempDir, tempdir};
#[test]
fn test_parse_params() {
let params = FileCacheParams::try_from(
"~/pingap?reading_max=1000&writing_max=500&cache_max=100&inactive=10m&levels=1:2",
).unwrap();
assert_eq!(params.reading_max, Some(1000));
assert_eq!(params.writing_max, Some(500));
assert_eq!(params.cache_max, 100);
assert_eq!(params.inactive, Some(Duration::from_secs(600)));
assert_eq!(params.levels, vec![1, 2]);
assert!(
params
.directory
.starts_with(dirs::home_dir().unwrap().to_str().unwrap())
);
}
#[tokio::test]
async fn test_file_cache_integration() {
let dir = tempdir().unwrap();
let dir_path_str = dir.path().to_str().unwrap();
let namespace = b"my-namespace";
let cache_config =
format!("{}?cache_max=100&cache_file_max_size=1024", dir_path_str);
let cache = FileCache::new(&cache_config).unwrap();
let key = "my-test-key";
let obj = CacheObject {
meta: (b"Meta-Key".to_vec(), b"Meta-Value".to_vec()),
body: Bytes::from_static(b"Hello World!"),
};
assert!(
cache.get(key, namespace).await.unwrap().is_none(),
"Initial get should be a miss"
);
cache.put(key, namespace, obj.clone()).await.unwrap();
let cached_obj = cache.get(key, namespace).await.unwrap().unwrap();
assert_eq!(obj, cached_obj);
assert!(
cache
.cache
.as_ref()
.unwrap()
.get(&key.to_string())
.is_some()
);
let fresh_cache = FileCache::new(&cache_config).unwrap();
let file_obj = fresh_cache.get(key, namespace).await.unwrap().unwrap();
assert_eq!(obj, file_obj);
assert!(
fresh_cache
.cache
.as_ref()
.unwrap()
.get(&key.to_string())
.is_some()
);
fresh_cache.remove(key, namespace).await.unwrap();
assert!(
fresh_cache
.cache
.as_ref()
.unwrap()
.get(&key.to_string())
.is_none()
);
assert!(
fresh_cache.get(key, namespace).await.unwrap().is_none(),
"Get after remove should be a miss"
);
}
#[tokio::test]
async fn test_cache_clear() {
let dir = tempdir().unwrap();
let cache = FileCache::new(dir.path().to_str().unwrap()).unwrap();
let old_file_path = cache.get_file_path("old_key", "ns");
fs::create_dir_all(old_file_path.parent().unwrap())
.await
.unwrap();
File::create(&old_file_path).unwrap();
let old_time = SystemTime::now() - Duration::from_secs(3600);
filetime::set_file_atime(
&old_file_path,
filetime::FileTime::from_system_time(old_time),
)
.unwrap();
let new_file_path = cache.get_file_path("new_key", "ns");
File::create(&new_file_path).unwrap();
let access_before = SystemTime::now() - Duration::from_secs(600);
let stats = cache.clear(access_before).await.unwrap();
assert_eq!(stats.success, 1);
assert_eq!(stats.fail, 0);
assert!(!old_file_path.exists());
assert!(new_file_path.exists());
}
#[test]
fn test_get_file_path() {
let dir = tempdir().unwrap();
let cache_no_levels =
FileCache::new(dir.path().to_str().unwrap()).unwrap();
let path1 = cache_no_levels.get_file_path("mykey", "namespace");
assert!(path1.to_string_lossy().ends_with("/namespace/mykey"));
let cache_with_levels_config =
format!("{}?levels=1:2", dir.path().to_str().unwrap());
let cache_with_levels =
FileCache::new(&cache_with_levels_config).unwrap();
let key = "abcdef123456";
let path2 = cache_with_levels.get_file_path(key, "ns");
assert!(path2.to_string_lossy().ends_with("/ns/5/34/abcdef123456"));
}
#[tokio::test]
async fn test_file_cache() {
let dir = TempDir::new().unwrap();
let namespace = b"pingap";
std::fs::create_dir(
dir.path()
.join(std::string::String::from_utf8_lossy(namespace).as_ref()),
)
.unwrap();
let dir = format!("{}?cache_max=100", dir.path().to_string_lossy());
let cache = FileCache::new(&dir).unwrap();
let key = "key";
let obj = CacheObject {
meta: (b"Hello".to_vec(), b"World".to_vec()),
body: Bytes::from_static(b"Hello World!"),
};
let result = cache.get(key, namespace).await.unwrap();
assert_eq!(true, result.is_none());
cache.put(key, namespace, obj.clone()).await.unwrap();
assert_eq!(
true,
cache
.cache
.as_ref()
.unwrap()
.get(&key.to_string())
.is_some()
);
let result = cache.get(key, namespace).await.unwrap().unwrap();
assert_eq!(obj, result);
let cache = FileCache::new(&dir).unwrap();
let result = cache.get(key, namespace).await.unwrap().unwrap();
assert_eq!(obj, result);
assert_eq!(
true,
cache
.cache
.as_ref()
.unwrap()
.get(&key.to_string())
.is_some()
);
cache.remove(key, namespace).await.unwrap();
assert_eq!(
false,
cache
.cache
.as_ref()
.unwrap()
.get(&key.to_string())
.is_some()
);
let result = cache.get(key, namespace).await.unwrap();
assert_eq!(true, result.is_none());
cache.put(key, namespace, obj.clone()).await.unwrap();
cache
.clear(
SystemTime::now()
.checked_add(Duration::from_secs(365 * 24 * 3600))
.unwrap(),
)
.await
.unwrap();
}
#[test]
fn test_stats() {
let dir = TempDir::new().unwrap();
let dir = dir.keep().to_string_lossy().to_string();
let cache = FileCache::new(&dir).unwrap();
assert_eq!(0, cache.stats().unwrap().reading);
assert_eq!(0, cache.stats().unwrap().writing);
}
#[test]
fn test_resolve_path() {
assert_eq!(
dirs::home_dir().unwrap().to_string_lossy().to_string(),
resolve_path("~/")
);
assert_eq!("", resolve_path(""));
let path = resolve_path("../pingap");
assert_eq!(true, path.ends_with("/pingap"));
assert_eq!(false, path.starts_with(".."));
}
}