use crate::compression::{CompressionAlgorithm, Compressor};
use serde::{Deserialize, Serialize};
use std::cell::RefCell;
use std::collections::HashMap;
use std::path::PathBuf;
use thiserror::Error;
use tokio::fs;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
#[derive(Debug, Error)]
pub enum TieredCacheError {
#[error("IO error: {0}")]
Io(#[from] std::io::Error),
#[error("Key not found: {0}")]
KeyNotFound(String),
#[error("Tier full: {tier}")]
TierFull { tier: String },
#[error("Serialization error: {0}")]
Serialization(String),
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub enum CacheTier {
L1 = 1,
L2 = 2,
L3 = 3,
}
impl CacheTier {
#[must_use]
#[inline]
pub const fn name(&self) -> &'static str {
match self {
Self::L1 => "L1-Memory",
Self::L2 => "L2-SSD",
Self::L3 => "L3-HDD",
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct CacheItemMetadata {
key: String,
size_bytes: u64,
tier: CacheTier,
access_count: u64,
last_access_ms: i64,
created_ms: i64,
}
impl CacheItemMetadata {
fn new(key: String, size_bytes: u64, tier: CacheTier) -> Self {
let now_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as i64;
Self {
key,
size_bytes,
tier,
access_count: 0,
last_access_ms: now_ms,
created_ms: now_ms,
}
}
fn record_access(&mut self) {
self.access_count += 1;
self.last_access_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as i64;
}
#[must_use]
#[inline]
const fn should_promote(&self, threshold: u64) -> bool {
self.access_count >= threshold
}
}
#[derive(Debug, Clone)]
pub struct TieredCacheConfig {
pub l1_capacity_bytes: u64,
pub l2_capacity_bytes: u64,
pub l3_capacity_bytes: u64,
pub l2_path: PathBuf,
pub l3_path: PathBuf,
pub promotion_threshold: u64,
pub compression: CompressionAlgorithm,
}
impl Default for TieredCacheConfig {
fn default() -> Self {
Self {
l1_capacity_bytes: 100 * 1024 * 1024, l2_capacity_bytes: 1024 * 1024 * 1024, l3_capacity_bytes: 10 * 1024 * 1024 * 1024, l2_path: PathBuf::from("./cache/l2"),
l3_path: PathBuf::from("./cache/l3"),
promotion_threshold: 3,
compression: CompressionAlgorithm::Balanced, }
}
}
#[derive(Debug, Clone, Default)]
pub struct TieredCacheStats {
pub l1_hits: u64,
pub l2_hits: u64,
pub l3_hits: u64,
pub misses: u64,
pub promotions_l2_to_l1: u64,
pub promotions_l3_to_l2: u64,
pub demotions_l1_to_l2: u64,
pub demotions_l2_to_l3: u64,
pub evictions: u64,
}
impl TieredCacheStats {
#[must_use]
#[inline]
pub fn l1_hit_rate(&self) -> f64 {
let total = self.l1_hits + self.l2_hits + self.l3_hits + self.misses;
if total == 0 {
0.0
} else {
self.l1_hits as f64 / total as f64
}
}
#[must_use]
#[inline]
pub fn overall_hit_rate(&self) -> f64 {
let hits = self.l1_hits + self.l2_hits + self.l3_hits;
let total = hits + self.misses;
if total == 0 {
0.0
} else {
hits as f64 / total as f64
}
}
#[must_use]
#[inline]
pub fn average_tier(&self) -> f64 {
let hits = self.l1_hits + self.l2_hits + self.l3_hits;
if hits == 0 {
0.0
} else {
(self.l1_hits as f64 + self.l2_hits as f64 * 2.0 + self.l3_hits as f64 * 3.0)
/ hits as f64
}
}
}
pub struct TieredCache {
config: TieredCacheConfig,
l1: HashMap<String, Vec<u8>>,
metadata: HashMap<String, CacheItemMetadata>,
l1_used: u64,
l2_used: u64,
l3_used: u64,
stats: TieredCacheStats,
compressor: RefCell<Compressor>,
}
impl TieredCache {
pub async fn new(config: TieredCacheConfig) -> Result<Self, TieredCacheError> {
fs::create_dir_all(&config.l2_path).await?;
fs::create_dir_all(&config.l3_path).await?;
let compressor = RefCell::new(Compressor::new(config.compression));
Ok(Self {
compressor,
config,
l1: HashMap::new(),
metadata: HashMap::new(),
l1_used: 0,
l2_used: 0,
l3_used: 0,
stats: TieredCacheStats::default(),
})
}
pub async fn put(&mut self, key: String, data: Vec<u8>) -> Result<(), TieredCacheError> {
let size = data.len() as u64;
if let Some(old_meta) = self.metadata.get(&key) {
self.remove_from_tier(&key, old_meta.tier).await?;
}
if self.l1_used + size <= self.config.l1_capacity_bytes {
self.l1.insert(key.clone(), data);
self.l1_used += size;
self.metadata.insert(
key.clone(),
CacheItemMetadata::new(key, size, CacheTier::L1),
);
Ok(())
} else {
self.evict_from_l1().await?;
if self.l1_used + size <= self.config.l1_capacity_bytes {
self.l1.insert(key.clone(), data);
self.l1_used += size;
self.metadata.insert(
key.clone(),
CacheItemMetadata::new(key, size, CacheTier::L1),
);
Ok(())
} else {
self.place_in_l2(key, data, size).await
}
}
}
pub async fn get(&mut self, key: &str) -> Result<Option<Vec<u8>>, TieredCacheError> {
let (tier, should_promote) = if let Some(meta) = self.metadata.get_mut(key) {
meta.record_access();
let should_promote = meta.should_promote(self.config.promotion_threshold);
(meta.tier, should_promote)
} else {
self.stats.misses += 1;
return Ok(None);
};
match tier {
CacheTier::L1 => {
self.stats.l1_hits += 1;
Ok(self.l1.get(key).cloned())
}
CacheTier::L2 => {
self.stats.l2_hits += 1;
let data = self.read_from_l2(key).await?;
if should_promote {
self.promote_to_l1(key.to_string(), data.clone()).await?;
}
Ok(Some(data))
}
CacheTier::L3 => {
self.stats.l3_hits += 1;
let data = self.read_from_l3(key).await?;
if should_promote {
self.promote_to_l2(key.to_string(), data.clone()).await?;
}
Ok(Some(data))
}
}
}
pub async fn remove(&mut self, key: &str) -> Result<(), TieredCacheError> {
if let Some(meta) = self.metadata.remove(key) {
self.remove_from_tier(key, meta.tier).await?;
}
Ok(())
}
#[must_use]
#[inline]
pub const fn stats(&self) -> &TieredCacheStats {
&self.stats
}
#[must_use]
#[inline]
pub fn l1_usage_percent(&self) -> f64 {
if self.config.l1_capacity_bytes == 0 {
0.0
} else {
self.l1_used as f64 / self.config.l1_capacity_bytes as f64
}
}
pub async fn warm_with_data(
&mut self,
items: Vec<(String, Vec<u8>)>,
) -> Result<usize, TieredCacheError> {
let mut warmed = 0;
for (key, data) in items {
if self.put(key, data).await.is_ok() {
warmed += 1;
}
}
Ok(warmed)
}
pub async fn warm_from_keys(&mut self, keys: &[String]) -> Result<usize, TieredCacheError> {
let mut warmed = 0;
for key in keys {
if let Ok(data) = self.read_from_l2(key).await {
if self.put(key.clone(), data).await.is_ok() {
warmed += 1;
continue;
}
}
if let Ok(data) = self.read_from_l3(key).await {
if self.put(key.clone(), data).await.is_ok() {
warmed += 1;
}
}
}
Ok(warmed)
}
#[must_use]
pub fn export_hot_keys(&self, limit: usize) -> Vec<String> {
let mut items: Vec<_> = self
.metadata
.iter()
.map(|(key, meta)| (key.clone(), meta.access_count))
.collect();
items.sort_by(|a, b| b.1.cmp(&a.1));
items.into_iter().take(limit).map(|(key, _)| key).collect()
}
#[must_use]
#[inline]
pub fn len(&self) -> usize {
self.metadata.len()
}
#[must_use]
#[inline]
pub fn is_empty(&self) -> bool {
self.metadata.is_empty()
}
async fn place_in_l2(
&mut self,
key: String,
data: Vec<u8>,
size: u64,
) -> Result<(), TieredCacheError> {
if self.l2_used + size > self.config.l2_capacity_bytes {
self.evict_from_l2().await?;
}
if self.l2_used + size <= self.config.l2_capacity_bytes {
self.write_to_l2(&key, &data).await?;
self.l2_used += size;
self.metadata.insert(
key.clone(),
CacheItemMetadata::new(key, size, CacheTier::L2),
);
Ok(())
} else {
self.place_in_l3(key, data, size).await
}
}
async fn place_in_l3(
&mut self,
key: String,
data: Vec<u8>,
size: u64,
) -> Result<(), TieredCacheError> {
if self.l3_used + size > self.config.l3_capacity_bytes {
self.evict_from_l3().await?;
}
if self.l3_used + size <= self.config.l3_capacity_bytes {
self.write_to_l3(&key, &data).await?;
self.l3_used += size;
self.metadata.insert(
key.clone(),
CacheItemMetadata::new(key, size, CacheTier::L3),
);
Ok(())
} else {
Err(TieredCacheError::TierFull {
tier: "L3".to_string(),
})
}
}
async fn evict_from_l1(&mut self) -> Result<(), TieredCacheError> {
let lru_key = self
.metadata
.iter()
.filter(|(_, meta)| meta.tier == CacheTier::L1)
.min_by_key(|(_, meta)| meta.last_access_ms)
.map(|(key, _)| key.clone());
if let Some(key) = lru_key {
if let Some(data) = self.l1.remove(&key) {
let size = self.metadata.get(&key).map(|m| m.size_bytes).unwrap_or(0);
self.l1_used -= size;
self.write_to_l2(&key, &data).await?;
self.l2_used += size;
if let Some(meta) = self.metadata.get_mut(&key) {
meta.tier = CacheTier::L2;
}
self.stats.demotions_l1_to_l2 += 1;
}
}
Ok(())
}
async fn evict_from_l2(&mut self) -> Result<(), TieredCacheError> {
let lru_key = self
.metadata
.iter()
.filter(|(_, meta)| meta.tier == CacheTier::L2)
.min_by_key(|(_, meta)| meta.last_access_ms)
.map(|(key, _)| key.clone());
if let Some(key) = lru_key {
let size = self.metadata.get(&key).map(|m| m.size_bytes).unwrap_or(0);
let data = self.read_from_l2(&key).await?;
self.l2_used -= size;
self.write_to_l3(&key, &data).await?;
self.l3_used += size;
if let Some(meta) = self.metadata.get_mut(&key) {
meta.tier = CacheTier::L3;
}
self.stats.demotions_l2_to_l3 += 1;
let _ = fs::remove_file(self.l2_path(&key)).await;
}
Ok(())
}
async fn evict_from_l3(&mut self) -> Result<(), TieredCacheError> {
let lru_key = self
.metadata
.iter()
.filter(|(_, meta)| meta.tier == CacheTier::L3)
.min_by_key(|(_, meta)| meta.last_access_ms)
.map(|(key, _)| key.clone());
if let Some(key) = lru_key {
if let Some(meta) = self.metadata.remove(&key) {
self.l3_used -= meta.size_bytes;
let _ = fs::remove_file(self.l3_path(&key)).await;
self.stats.evictions += 1;
}
}
Ok(())
}
async fn promote_to_l1(&mut self, key: String, data: Vec<u8>) -> Result<(), TieredCacheError> {
let (size, current_tier) = if let Some(meta) = self.metadata.get(&key) {
(meta.size_bytes, meta.tier)
} else {
return Ok(());
};
if current_tier == CacheTier::L1 {
return Ok(());
}
while self.l1_used + size > self.config.l1_capacity_bytes {
self.evict_from_l1().await?;
}
match current_tier {
CacheTier::L2 => {
self.l2_used -= size;
let _ = fs::remove_file(self.l2_path(&key)).await;
self.stats.promotions_l2_to_l1 += 1;
}
CacheTier::L3 => {
self.l3_used -= size;
let _ = fs::remove_file(self.l3_path(&key)).await;
}
CacheTier::L1 => return Ok(()), }
self.l1.insert(key.clone(), data);
self.l1_used += size;
if let Some(meta) = self.metadata.get_mut(&key) {
meta.tier = CacheTier::L1;
}
Ok(())
}
async fn promote_to_l2(&mut self, key: String, data: Vec<u8>) -> Result<(), TieredCacheError> {
let (size, current_tier) = if let Some(meta) = self.metadata.get(&key) {
(meta.size_bytes, meta.tier)
} else {
return Ok(());
};
if current_tier == CacheTier::L3 {
while self.l2_used + size > self.config.l2_capacity_bytes {
self.evict_from_l2().await?;
}
self.l3_used -= size;
let _ = fs::remove_file(self.l3_path(&key)).await;
self.write_to_l2(&key, &data).await?;
self.l2_used += size;
if let Some(meta) = self.metadata.get_mut(&key) {
meta.tier = CacheTier::L2;
}
self.stats.promotions_l3_to_l2 += 1;
}
Ok(())
}
async fn remove_from_tier(
&mut self,
key: &str,
tier: CacheTier,
) -> Result<(), TieredCacheError> {
if let Some(meta) = self.metadata.get(key) {
match tier {
CacheTier::L1 => {
self.l1.remove(key);
self.l1_used -= meta.size_bytes;
}
CacheTier::L2 => {
let _ = fs::remove_file(self.l2_path(key)).await;
self.l2_used -= meta.size_bytes;
}
CacheTier::L3 => {
let _ = fs::remove_file(self.l3_path(key)).await;
self.l3_used -= meta.size_bytes;
}
}
}
Ok(())
}
fn l2_path(&self, key: &str) -> PathBuf {
self.config.l2_path.join(format!("{}.cache", key))
}
fn l3_path(&self, key: &str) -> PathBuf {
self.config.l3_path.join(format!("{}.cache", key))
}
async fn write_to_l2(&self, key: &str, data: &[u8]) -> Result<(), TieredCacheError> {
let path = self.l2_path(key);
let write_data = if !self.config.compression.is_none() {
self.compressor
.borrow_mut()
.compress(data)
.map_err(|e| TieredCacheError::Io(std::io::Error::other(e)))?
} else {
data.to_vec()
};
let mut file = fs::File::create(path).await?;
file.write_all(&write_data).await?;
file.sync_all().await?;
Ok(())
}
async fn write_to_l3(&self, key: &str, data: &[u8]) -> Result<(), TieredCacheError> {
let path = self.l3_path(key);
let write_data = if !self.config.compression.is_none() {
self.compressor
.borrow_mut()
.compress(data)
.map_err(|e| TieredCacheError::Io(std::io::Error::other(e)))?
} else {
data.to_vec()
};
let mut file = fs::File::create(path).await?;
file.write_all(&write_data).await?;
file.sync_all().await?;
Ok(())
}
async fn read_from_l2(&self, key: &str) -> Result<Vec<u8>, TieredCacheError> {
let path = self.l2_path(key);
let mut file = fs::File::open(path).await?;
let mut compressed_data = Vec::new();
file.read_to_end(&mut compressed_data).await?;
let data = if !self.config.compression.is_none() {
self.compressor
.borrow_mut()
.decompress(&compressed_data)
.map_err(|e| TieredCacheError::Io(std::io::Error::other(e)))?
} else {
compressed_data
};
Ok(data)
}
async fn read_from_l3(&self, key: &str) -> Result<Vec<u8>, TieredCacheError> {
let path = self.l3_path(key);
let mut file = fs::File::open(path).await?;
let mut compressed_data = Vec::new();
file.read_to_end(&mut compressed_data).await?;
let data = if !self.config.compression.is_none() {
self.compressor
.borrow_mut()
.decompress(&compressed_data)
.map_err(|e| TieredCacheError::Io(std::io::Error::other(e)))?
} else {
compressed_data
};
Ok(data)
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
async fn create_test_cache() -> (TempDir, TieredCache) {
let temp_dir = TempDir::new().unwrap();
let config = TieredCacheConfig {
l1_capacity_bytes: 100,
l2_capacity_bytes: 200,
l3_capacity_bytes: 300,
l2_path: temp_dir.path().join("l2"),
l3_path: temp_dir.path().join("l3"),
promotion_threshold: 2,
compression: CompressionAlgorithm::None, };
let cache = TieredCache::new(config).await.unwrap();
(temp_dir, cache)
}
#[tokio::test]
async fn test_tiered_cache_creation() {
let (_temp, cache) = create_test_cache().await;
assert_eq!(cache.l1_used, 0);
assert_eq!(cache.l2_used, 0);
assert_eq!(cache.l3_used, 0);
}
#[tokio::test]
async fn test_put_and_get_l1() {
let (_temp, mut cache) = create_test_cache().await;
cache
.put("key1".to_string(), b"small".to_vec())
.await
.unwrap();
let data = cache.get("key1").await.unwrap();
assert_eq!(data, Some(b"small".to_vec()));
assert_eq!(cache.stats.l1_hits, 1);
}
#[tokio::test]
async fn test_automatic_demotion() {
let (_temp, mut cache) = create_test_cache().await;
cache.put("key1".to_string(), vec![1; 60]).await.unwrap();
cache.put("key2".to_string(), vec![2; 60]).await.unwrap();
assert!(cache.stats.demotions_l1_to_l2 >= 1);
}
#[tokio::test]
async fn test_promotion_on_access() {
let (_temp, mut cache) = create_test_cache().await;
cache.put("key1".to_string(), vec![1; 60]).await.unwrap();
cache.put("key2".to_string(), vec![2; 60]).await.unwrap();
let _ = cache.get("key1").await;
let _ = cache.get("key1").await;
let _ = cache.get("key1").await;
if let Some(meta) = cache.metadata.get("key1") {
assert_eq!(meta.tier, CacheTier::L1);
}
}
#[tokio::test]
async fn test_hit_rate_calculation() {
let (_temp, mut cache) = create_test_cache().await;
cache
.put("key1".to_string(), b"data".to_vec())
.await
.unwrap();
let _ = cache.get("key1").await;
let _ = cache.get("key1").await;
let _ = cache.get("nonexistent").await;
let hit_rate = cache.stats.overall_hit_rate();
assert!((hit_rate - 0.666).abs() < 0.01);
}
#[tokio::test]
async fn test_remove() {
let (_temp, mut cache) = create_test_cache().await;
cache
.put("key1".to_string(), b"data".to_vec())
.await
.unwrap();
assert!(cache.get("key1").await.unwrap().is_some());
cache.remove("key1").await.unwrap();
assert!(cache.get("key1").await.unwrap().is_none());
}
#[tokio::test]
async fn test_warm_with_data() {
let (_temp, mut cache) = create_test_cache().await;
let warm_data = vec![
("key1".to_string(), b"data1".to_vec()),
("key2".to_string(), b"data2".to_vec()),
("key3".to_string(), b"data3".to_vec()),
];
let warmed = cache.warm_with_data(warm_data).await.unwrap();
assert_eq!(warmed, 3);
assert!(cache.get("key1").await.unwrap().is_some());
assert!(cache.get("key2").await.unwrap().is_some());
assert!(cache.get("key3").await.unwrap().is_some());
}
#[tokio::test]
async fn test_warm_from_keys() {
let (_temp, mut cache) = create_test_cache().await;
cache.put("key1".to_string(), vec![0u8; 150]).await.unwrap();
cache.put("key2".to_string(), vec![0u8; 150]).await.unwrap();
let _metadata_before = cache.metadata.clone();
let config = TieredCacheConfig {
l1_capacity_bytes: 100,
l2_capacity_bytes: 200,
l3_capacity_bytes: 300,
l2_path: cache.config.l2_path.clone(),
l3_path: cache.config.l3_path.clone(),
promotion_threshold: 2,
compression: CompressionAlgorithm::None,
};
let mut new_cache = TieredCache::new(config).await.unwrap();
let keys = vec!["key1".to_string(), "key2".to_string()];
let _warmed = new_cache.warm_from_keys(&keys).await.unwrap();
}
#[tokio::test]
async fn test_export_hot_keys() {
let (_temp, mut cache) = create_test_cache().await;
cache
.put("hot1".to_string(), b"data".to_vec())
.await
.unwrap();
cache
.put("hot2".to_string(), b"data".to_vec())
.await
.unwrap();
cache
.put("cold".to_string(), b"data".to_vec())
.await
.unwrap();
for _ in 0..5 {
let _ = cache.get("hot1").await;
}
for _ in 0..3 {
let _ = cache.get("hot2").await;
}
let _ = cache.get("cold").await;
let hot_keys = cache.export_hot_keys(2);
assert_eq!(hot_keys.len(), 2);
assert!(hot_keys.contains(&"hot1".to_string()));
assert!(hot_keys.contains(&"hot2".to_string()));
}
#[tokio::test]
async fn test_len_and_is_empty() {
let (_temp, mut cache) = create_test_cache().await;
assert!(cache.is_empty());
assert_eq!(cache.len(), 0);
cache
.put("key1".to_string(), b"data".to_vec())
.await
.unwrap();
assert!(!cache.is_empty());
assert_eq!(cache.len(), 1);
cache
.put("key2".to_string(), b"data".to_vec())
.await
.unwrap();
assert_eq!(cache.len(), 2);
cache.remove("key1").await.unwrap();
assert_eq!(cache.len(), 1);
}
}