use dashmap::DashMap;
use memmap2::Mmap;
use std::fs::File;
use std::path::{Path, PathBuf};
use std::sync::{Arc, OnceLock};
use std::time::{Duration, SystemTime};
use crate::dev_print;
use crate::SendableError;
#[derive(Debug)]
pub struct ZeroCopyFile {
mmap: Mmap,
_file: File,
last_accessed: SystemTime,
file_size: usize,
}
impl ZeroCopyFile {
pub fn new<P: AsRef<Path>>(path: P) -> Result<Self, SendableError> {
let file = File::open(path.as_ref())?;
let metadata = file.metadata()?;
let file_size = metadata.len() as usize;
if file_size == 0 {
return Err("Cannot memory map empty file".into());
}
let mmap = unsafe { Mmap::map(&file)? };
Ok(Self {
mmap,
_file: file,
last_accessed: SystemTime::now(),
file_size,
})
}
pub fn as_bytes(&self) -> &[u8] {
&self.mmap
}
pub fn as_str(&self) -> Result<&str, SendableError> {
Ok(std::str::from_utf8(&self.mmap)?)
}
pub fn len(&self) -> usize {
self.file_size
}
pub fn is_empty(&self) -> bool {
self.file_size == 0
}
pub fn update_access_time(&mut self) {
self.last_accessed = SystemTime::now();
}
pub fn last_accessed(&self) -> SystemTime {
self.last_accessed
}
pub fn parse_json<T>(&self) -> Result<T, SendableError>
where
T: for<'de> serde::de::Deserialize<'de>,
{
let json_str = self.as_str()?;
Ok(serde_json::from_str(json_str)?)
}
pub fn slice(&self, start: usize, end: usize) -> Result<&[u8], SendableError> {
if end > self.mmap.len() || start > end {
return Err(format!(
"Index out of bounds: start={}, end={}, len={}",
start, end, self.mmap.len()
).into());
}
Ok(&self.mmap[start..end])
}
pub fn lines(&self) -> LineIterator<'_> {
LineIterator {
data: &self.mmap,
pos: 0,
}
}
pub fn find(&self, pattern: &[u8]) -> Option<usize> {
self.mmap
.windows(pattern.len())
.position(|window| window == pattern)
}
}
pub struct LineIterator<'a> {
data: &'a [u8],
pos: usize,
}
impl<'a> Iterator for LineIterator<'a> {
type Item = &'a [u8];
fn next(&mut self) -> Option<Self::Item> {
if self.pos >= self.data.len() {
return None;
}
let start = self.pos;
while self.pos < self.data.len() {
if self.data[self.pos] == b'\n' {
let end = self.pos;
self.pos += 1;
let line_end = if end > start && self.data[end - 1] == b'\r' {
end - 1
} else {
end
};
return Some(&self.data[start..line_end]);
}
self.pos += 1;
}
if start < self.data.len() {
Some(&self.data[start..])
} else {
None
}
}
}
#[derive(Debug)]
pub struct CachedFileData {
data: Vec<u8>,
last_accessed: SystemTime,
modified_time: SystemTime,
file_path: PathBuf,
original_size: usize,
}
impl CachedFileData {
pub fn new(data: Vec<u8>, file_path: PathBuf, modified_time: SystemTime) -> Self {
let original_size = data.len();
Self {
data,
last_accessed: SystemTime::now(),
modified_time,
file_path,
original_size,
}
}
pub fn get_info(&self) -> (PathBuf, usize) {
(self.file_path.clone(), self.original_size)
}
pub fn as_bytes(&self) -> &[u8] {
&self.data
}
pub fn as_str(&self) -> Result<&str, SendableError> {
Ok(std::str::from_utf8(&self.data)?)
}
pub fn len(&self) -> usize {
self.data.len()
}
pub fn update_access_time(&mut self) {
self.last_accessed = SystemTime::now();
}
pub fn last_accessed(&self) -> SystemTime {
self.last_accessed
}
pub fn parse_json<T>(&self) -> Result<T, SendableError>
where
T: for<'de> serde::de::Deserialize<'de>,
{
let json_str = self.as_str()?;
Ok(serde_json::from_str(json_str)?)
}
}
#[derive(Debug, Clone)]
pub struct CacheConfig {
pub max_cache_files: usize,
pub max_cache_file_size: usize,
pub cache_duration: Duration,
pub total_cache_size_limit: usize,
}
impl Default for CacheConfig {
fn default() -> Self {
Self {
max_cache_files: 50, max_cache_file_size: 1024 * 1024, total_cache_size_limit: 50 * 1024 * 1024, cache_duration: Duration::from_secs(300), }
}
}
pub struct ZeroCopyCache {
memory_cache: Arc<DashMap<PathBuf, CachedFileData>>,
max_cache_files: usize,
max_cache_file_size: usize, cache_duration: Duration,
total_cache_size_limit: usize,
}
static GLOBAL_CACHE: OnceLock<ZeroCopyCache> = OnceLock::new();
impl ZeroCopyCache {
pub fn new(
max_cache_files: usize,
max_cache_file_size: usize,
total_cache_size_limit: usize,
cache_duration: Duration
) -> Self {
Self {
memory_cache: Arc::new(DashMap::new()),
max_cache_files,
max_cache_file_size,
total_cache_size_limit,
cache_duration,
}
}
pub fn from_config(config: CacheConfig) -> Self {
Self::new(
config.max_cache_files,
config.max_cache_file_size,
config.total_cache_size_limit,
config.cache_duration,
)
}
pub fn default() -> Self {
Self::from_config(CacheConfig::default())
}
pub fn init_global(config: Option<CacheConfig>) -> Result<(), &'static str> {
let cache_config = config.unwrap_or_default();
let cache = Self::from_config(cache_config);
GLOBAL_CACHE.set(cache)
.map_err(|_| "Global cache already initialized")
}
pub fn global() -> &'static ZeroCopyCache {
GLOBAL_CACHE.get_or_init(|| Self::default())
}
pub fn load_file<P: AsRef<Path>>(&self, path: P) -> Result<FileLoadResult, SendableError> {
let path_buf = path.as_ref().to_path_buf();
let metadata = std::fs::metadata(&path_buf)?;
let file_size = metadata.len() as usize;
if file_size <= self.max_cache_file_size {
let modified_time = metadata.modified().unwrap_or_else(|_| SystemTime::now());
return self.load_from_memory_cache(path_buf, file_size, modified_time);
}
dev_print!("Large file detected ({}MB), using direct memmap", file_size / (1024 * 1024));
let zero_copy_file = ZeroCopyFile::new(&path_buf)?;
Ok(FileLoadResult::DirectMemoryMap(zero_copy_file))
}
fn load_from_memory_cache(&self, path_buf: PathBuf, file_size: usize, modified_time: SystemTime) -> Result<FileLoadResult, SendableError> {
{
if let Some(mut cached_data) = self.memory_cache.get_mut(&path_buf) {
if cached_data.modified_time == modified_time {
cached_data.update_access_time();
dev_print!("File loaded from memory cache: {:?} ({}KB)", path_buf, file_size / 1024);
return Ok(FileLoadResult::MemoryCache(cached_data.data.clone()));
}
dev_print!("File modified, reloading cache: {:?}", path_buf);
drop(cached_data);
}
}
dev_print!("Loading file to memory cache: {:?} ({}KB)", path_buf, file_size / 1024);
let file_data = std::fs::read(&path_buf)?;
let cached_data = CachedFileData::new(file_data.clone(), path_buf.clone(), modified_time);
{
self.ensure_cache_capacity(self.memory_cache.clone(), file_size);
self.memory_cache.insert(path_buf, cached_data);
}
Ok(FileLoadResult::MemoryCache(file_data))
}
fn ensure_cache_capacity(&self, cache: Arc<DashMap<PathBuf, CachedFileData>>, new_file_size: usize) {
while cache.len() >= self.max_cache_files {
self.evict_oldest_from_cache(&cache);
}
let current_total_size: usize = cache.iter().map(|f| f.original_size).sum();
let mut remaining_size = current_total_size + new_file_size;
while remaining_size > self.total_cache_size_limit && !cache.is_empty() {
if let Some(evicted_size) = self.evict_oldest_from_cache(&cache) {
remaining_size -= evicted_size;
} else {
break;
}
}
}
fn evict_oldest_from_cache(&self, cache: &Arc<DashMap<PathBuf, CachedFileData>>) -> Option<usize> {
if cache.is_empty() {
return None;
}
let oldest_entry = cache
.iter()
.min_by_key(|file| file.last_accessed())
.map(|file| (file.key().clone(), file.len()));
if let Some((oldest_path, size)) = oldest_entry {
cache.remove(&oldest_path);
dev_print!("Evicted file from memory cache: {:?} ({}KB)", oldest_path, size / 1024);
return Some(size);
}
None
}
pub fn cleanup_expired(&self) {
let now = SystemTime::now();
let expired_keys: Vec<PathBuf> = self.memory_cache
.iter()
.filter_map( |file| {
if now.duration_since(file.last_accessed()).unwrap_or_default() > self.cache_duration {
Some(file.key().clone())
} else {
None
}
})
.collect();
for key in expired_keys {
if let Some((_path, removed)) = self.memory_cache.remove(&key) {
dev_print!("Removed expired file from cache: {:?} ({}KB)", key, removed.original_size / 1024);
}
}
}
pub fn stats(&self) -> CacheStats {
let total_size: usize = self.memory_cache.iter().map(|f| f.original_size).sum();
CacheStats {
file_count: self.memory_cache.len(),
total_size,
max_cache_size: self.max_cache_files,
max_file_size: self.max_cache_file_size,
total_cache_size_limit: self.total_cache_size_limit,
}
}
pub fn clear_cache(&self) {
let count = self.memory_cache.len();
let total_size: usize = self.memory_cache.iter().map(|f| f.original_size).sum();
self.memory_cache.clear();
dev_print!("Cleared memory cache: {} files, {}MB", count, total_size / (1024 * 1024));
}
}
pub enum FileLoadResult {
MemoryCache(Vec<u8>),
DirectMemoryMap(ZeroCopyFile),
}
impl FileLoadResult {
pub fn as_bytes(&self) -> &[u8] {
match self {
FileLoadResult::MemoryCache(data) => data,
FileLoadResult::DirectMemoryMap(mmap_file) => mmap_file.as_bytes(),
}
}
pub fn as_str(&self) -> Result<&str, SendableError> {
match self {
FileLoadResult::MemoryCache(data) => Ok(std::str::from_utf8(data)?),
FileLoadResult::DirectMemoryMap(mmap_file) => mmap_file.as_str(),
}
}
pub fn len(&self) -> usize {
match self {
FileLoadResult::MemoryCache(data) => data.len(),
FileLoadResult::DirectMemoryMap(mmap_file) => mmap_file.len(),
}
}
pub fn parse_json<T>(&self) -> Result<T, SendableError>
where
T: for<'de> serde::de::Deserialize<'de>,
{
match self {
FileLoadResult::MemoryCache(data) => {
let json_str = std::str::from_utf8(data)?;
Ok(serde_json::from_str(json_str)?)
}
FileLoadResult::DirectMemoryMap(mmap_file) => mmap_file.parse_json(),
}
}
pub fn is_memory_cached(&self) -> bool {
matches!(self, FileLoadResult::MemoryCache(_))
}
pub fn is_memory_mapped(&self) -> bool {
matches!(self, FileLoadResult::DirectMemoryMap(_))
}
}
#[derive(Debug, Clone)]
pub struct CacheStats {
pub file_count: usize,
pub total_size: usize,
pub max_cache_size: usize,
pub max_file_size: usize,
pub total_cache_size_limit: usize,
}
impl std::fmt::Display for CacheStats {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f,
"Memory Cache Stats: {} files, {:.2}/{:.2} MB used, max {} files, max {:.2} MB per file",
self.file_count,
self.total_size as f64 / 1_048_576.0,
self.total_cache_size_limit as f64 / 1_048_576.0,
self.max_cache_size,
self.max_file_size as f64 / 1_048_576.0
)
}
}
pub fn parse_json_file<T, P>(path: P) -> Result<T, SendableError>
where
T: for<'de> serde::de::Deserialize<'de>,
P: AsRef<Path>,
{
let metadata = std::fs::metadata(&path)?;
let file_size = metadata.len() as usize;
if file_size <= 1024 * 1024 { dev_print!("Small JSON file, using standard read: {}KB", file_size / 1024);
let data = std::fs::read(&path)?;
let json_str = std::str::from_utf8(&data)?;
Ok(serde_json::from_str(json_str)?)
} else { dev_print!("Large JSON file, using zero-copy mmap: {}MB", file_size / (1024 * 1024));
let zero_copy_file = ZeroCopyFile::new(path)?;
zero_copy_file.parse_json()
}
}
pub fn parse_json_file_cached<T, P>(path: P, cache: &mut ZeroCopyCache) -> Result<T, SendableError>
where
T: for<'de> serde::de::Deserialize<'de>,
P: AsRef<Path>,
{
let file_result = cache.load_file(path)?;
file_result.parse_json()
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Write;
fn create_test_cache() -> ZeroCopyCache {
ZeroCopyCache::from_config(CacheConfig {
max_cache_files: 10,
max_cache_file_size: 1024 * 1024,
total_cache_size_limit: 10 * 1024 * 1024,
cache_duration: Duration::from_secs(300),
})
}
#[test]
fn test_cache_returns_fresh_data() {
let cache = create_test_cache();
let dir = std::env::temp_dir().join("atomic_http_test_fresh");
std::fs::create_dir_all(&dir).unwrap();
let file_path = dir.join("test_fresh.txt");
std::fs::write(&file_path, b"hello").unwrap();
let result1 = cache.load_file(&file_path).unwrap();
assert_eq!(result1.as_bytes(), b"hello");
let result2 = cache.load_file(&file_path).unwrap();
assert_eq!(result2.as_bytes(), b"hello");
assert!(result2.is_memory_cached());
std::fs::remove_file(&file_path).ok();
std::fs::remove_dir(&dir).ok();
}
#[test]
fn test_cache_invalidates_on_file_modification() {
let cache = create_test_cache();
let dir = std::env::temp_dir().join("atomic_http_test_mtime");
std::fs::create_dir_all(&dir).unwrap();
let file_path = dir.join("test_mtime.txt");
std::fs::write(&file_path, b"version1").unwrap();
let result1 = cache.load_file(&file_path).unwrap();
assert_eq!(result1.as_bytes(), b"version1");
std::thread::sleep(Duration::from_secs(1));
let mut file = std::fs::OpenOptions::new()
.write(true)
.truncate(true)
.open(&file_path)
.unwrap();
file.write_all(b"version2").unwrap();
file.flush().unwrap();
drop(file);
let result2 = cache.load_file(&file_path).unwrap();
assert_eq!(result2.as_bytes(), b"version2");
std::fs::remove_file(&file_path).ok();
std::fs::remove_dir(&dir).ok();
}
#[test]
fn test_cache_stats_after_reload() {
let cache = create_test_cache();
let dir = std::env::temp_dir().join("atomic_http_test_stats");
std::fs::create_dir_all(&dir).unwrap();
let file_path = dir.join("test_stats.txt");
std::fs::write(&file_path, b"data").unwrap();
cache.load_file(&file_path).unwrap();
assert_eq!(cache.stats().file_count, 1);
std::thread::sleep(Duration::from_secs(1));
std::fs::write(&file_path, b"updated_data").unwrap();
cache.load_file(&file_path).unwrap();
assert_eq!(cache.stats().file_count, 1);
std::fs::remove_file(&file_path).ok();
std::fs::remove_dir(&dir).ok();
}
}