use crate::types::Query;
use parking_lot::Mutex;
use std::collections::{HashMap, VecDeque};
use std::time::{Duration, Instant};
use super::planner::PhysicalPlan;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct CacheKey([u8; 32]);
impl CacheKey {
pub(crate) fn from_query(query: &Query) -> Self {
let raw = format!("{:?}", query);
let normalized = Self::normalize(&raw);
let hash = blake3::hash(normalized.as_bytes());
Self(*hash.as_bytes())
}
pub(crate) fn normalize(raw: &str) -> String {
let trimmed = raw.trim();
if let Some(idx) = trimmed.find(|c: char| !c.is_alphanumeric() && c != '_') {
let (op, rest) = trimmed.split_at(idx);
format!("{}{}", op.to_lowercase(), rest)
} else {
trimmed.to_lowercase()
}
}
#[allow(dead_code)]
fn from_str(s: &str) -> Self {
let hash = blake3::hash(s.as_bytes());
Self(*hash.as_bytes())
}
}
#[derive(Debug, Clone)]
pub struct CachedPlan {
pub plan: PhysicalPlan,
pub cached_at: Instant,
pub hit_count: u64,
pub normalized_query: String,
}
#[derive(Debug, Clone)]
pub struct PlanCacheConfig {
pub max_entries: usize,
pub ttl: Duration,
}
impl Default for PlanCacheConfig {
fn default() -> Self {
Self {
max_entries: 1000,
ttl: Duration::from_secs(300), }
}
}
#[derive(Debug, Clone, Default)]
pub struct CacheStats {
pub hits: u64,
pub misses: u64,
pub evictions: u64,
pub size: usize,
}
pub struct PlanCache {
entries: Mutex<HashMap<CacheKey, CachedPlan>>,
lru_order: Mutex<VecDeque<CacheKey>>,
config: PlanCacheConfig,
stats: Mutex<CacheStats>,
}
impl PlanCache {
pub fn new(config: PlanCacheConfig) -> Self {
Self {
entries: Mutex::new(HashMap::new()),
lru_order: Mutex::new(VecDeque::new()),
config,
stats: Mutex::new(CacheStats::default()),
}
}
pub fn get(&self, key: &CacheKey) -> Option<PhysicalPlan> {
let mut entries = self.entries.lock();
let mut stats = self.stats.lock();
if let Some(entry) = entries.get_mut(key) {
if entry.cached_at.elapsed() > self.config.ttl {
entries.remove(key);
let mut lru = self.lru_order.lock();
lru.retain(|k| k != key);
stats.misses += 1;
stats.evictions += 1;
stats.size = entries.len();
return None;
}
entry.hit_count += 1;
stats.hits += 1;
let mut lru = self.lru_order.lock();
lru.retain(|k| k != key);
lru.push_back(*key);
Some(entry.plan.clone())
} else {
stats.misses += 1;
None
}
}
pub fn insert(&self, key: CacheKey, plan: PhysicalPlan, normalized_query: String) {
let mut entries = self.entries.lock();
let mut lru = self.lru_order.lock();
let mut stats = self.stats.lock();
if entries.contains_key(&key) {
lru.retain(|k| k != &key);
}
while entries.len() >= self.config.max_entries {
if let Some(evicted_key) = lru.pop_front() {
entries.remove(&evicted_key);
stats.evictions += 1;
} else {
break;
}
}
entries.insert(
key,
CachedPlan {
plan,
cached_at: Instant::now(),
hit_count: 0,
normalized_query,
},
);
lru.push_back(key);
stats.size = entries.len();
}
pub fn invalidate_all(&self) {
let mut entries = self.entries.lock();
let mut lru = self.lru_order.lock();
let mut stats = self.stats.lock();
let evicted = entries.len() as u64;
entries.clear();
lru.clear();
stats.evictions += evicted;
stats.size = 0;
}
pub fn invalidate_prefix(&self, prefix: &str) {
let normalized_prefix = prefix.trim().to_lowercase();
let mut entries = self.entries.lock();
let mut lru = self.lru_order.lock();
let mut stats = self.stats.lock();
let keys_to_remove: Vec<CacheKey> = entries
.iter()
.filter(|(_, v)| v.normalized_query.contains(&normalized_prefix))
.map(|(k, _)| *k)
.collect();
for key in &keys_to_remove {
entries.remove(key);
stats.evictions += 1;
}
lru.retain(|k| !keys_to_remove.contains(k));
stats.size = entries.len();
}
pub fn cache_stats(&self) -> CacheStats {
let stats = self.stats.lock();
stats.clone()
}
}
impl std::fmt::Debug for PlanCache {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let stats = self.cache_stats();
f.debug_struct("PlanCache")
.field("max_entries", &self.config.max_entries)
.field("ttl", &self.config.ttl)
.field("stats", &stats)
.finish()
}
}
#[cfg(test)]
mod tests {
use super::super::planner::QueryPlanner;
use super::*;
use crate::error::Result;
use crate::types::Key;
#[test]
fn test_cache_hit_for_same_query() -> Result<()> {
let planner = QueryPlanner::new().with_cache(PlanCacheConfig::default());
let query = Query::Get {
collection: "users".to_string(),
key: Key::from_str("user:1"),
};
let plan1 = planner.plan(&query)?;
let stats1 = planner.cache_stats();
assert_eq!(stats1.misses, 1, "first call should be a miss");
assert_eq!(stats1.hits, 0);
assert_eq!(stats1.size, 1, "one entry should be cached");
let plan2 = planner.plan(&query)?;
let stats2 = planner.cache_stats();
assert_eq!(stats2.hits, 1, "second call should be a hit");
assert_eq!(stats2.misses, 1, "miss count should not change");
assert_eq!(format!("{:?}", plan1), format!("{:?}", plan2));
Ok(())
}
#[test]
fn test_cache_miss_for_different_queries() -> Result<()> {
let planner = QueryPlanner::new().with_cache(PlanCacheConfig::default());
let query_a = Query::Get {
collection: "users".to_string(),
key: Key::from_str("user:1"),
};
let query_b = Query::Get {
collection: "users".to_string(),
key: Key::from_str("user:2"),
};
let _plan_a = planner.plan(&query_a)?;
let _plan_b = planner.plan(&query_b)?;
let stats = planner.cache_stats();
assert_eq!(stats.misses, 2, "both queries should miss");
assert_eq!(stats.hits, 0);
assert_eq!(stats.size, 2);
Ok(())
}
#[test]
fn test_cache_ttl_expiry() -> Result<()> {
let config = PlanCacheConfig {
max_entries: 100,
ttl: Duration::from_millis(50), };
let planner = QueryPlanner::new().with_cache(config);
let query = Query::Get {
collection: "items".to_string(),
key: Key::from_str("item:1"),
};
let _plan1 = planner.plan(&query)?;
let stats1 = planner.cache_stats();
assert_eq!(stats1.misses, 1);
std::thread::sleep(Duration::from_millis(100));
let _plan2 = planner.plan(&query)?;
let stats2 = planner.cache_stats();
assert_eq!(stats2.misses, 2, "expired entry should cause a miss");
assert_eq!(
stats2.evictions, 1,
"expired entry should count as eviction"
);
Ok(())
}
#[test]
fn test_cache_lru_eviction() -> Result<()> {
let config = PlanCacheConfig {
max_entries: 3,
ttl: Duration::from_secs(300),
};
let planner = QueryPlanner::new().with_cache(config);
for i in 0..3 {
let query = Query::Get {
collection: "data".to_string(),
key: Key::from_str(&format!("key:{}", i)),
};
let _plan = planner.plan(&query)?;
}
let stats = planner.cache_stats();
assert_eq!(stats.size, 3);
assert_eq!(stats.evictions, 0);
let query_new = Query::Get {
collection: "data".to_string(),
key: Key::from_str("key:3"),
};
let _plan = planner.plan(&query_new)?;
let stats = planner.cache_stats();
assert_eq!(stats.size, 3, "size should remain at max_entries");
assert_eq!(stats.evictions, 1, "one entry should have been evicted");
let query_1 = Query::Get {
collection: "data".to_string(),
key: Key::from_str("key:1"),
};
let _plan = planner.plan(&query_1)?;
let stats = planner.cache_stats();
assert_eq!(stats.hits, 1, "key:1 should still be in cache");
let query_0 = Query::Get {
collection: "data".to_string(),
key: Key::from_str("key:0"),
};
let _plan = planner.plan(&query_0)?;
let stats = planner.cache_stats();
assert!(
stats.misses >= 5,
"key:0 should have been evicted and cause a miss"
);
Ok(())
}
#[test]
fn test_cache_stats_accuracy() -> Result<()> {
let config = PlanCacheConfig {
max_entries: 10,
ttl: Duration::from_secs(300),
};
let planner = QueryPlanner::new().with_cache(config);
let stats = planner.cache_stats();
assert_eq!(stats.hits, 0);
assert_eq!(stats.misses, 0);
assert_eq!(stats.evictions, 0);
assert_eq!(stats.size, 0);
let query = Query::Get {
collection: "stats_test".to_string(),
key: Key::from_str("k1"),
};
let _p = planner.plan(&query)?;
let stats = planner.cache_stats();
assert_eq!(stats.misses, 1);
assert_eq!(stats.size, 1);
for _ in 0..5 {
let _p = planner.plan(&query)?;
}
let stats = planner.cache_stats();
assert_eq!(stats.hits, 5);
assert_eq!(stats.misses, 1);
assert_eq!(stats.size, 1);
Ok(())
}
#[test]
fn test_cache_invalidate_all() -> Result<()> {
let planner = QueryPlanner::new().with_cache(PlanCacheConfig::default());
for i in 0..5 {
let query = Query::Get {
collection: "inv_all".to_string(),
key: Key::from_str(&format!("k:{}", i)),
};
let _p = planner.plan(&query)?;
}
let stats = planner.cache_stats();
assert_eq!(stats.size, 5);
planner.invalidate_all();
let stats = planner.cache_stats();
assert_eq!(stats.size, 0, "all entries should be removed");
assert_eq!(stats.evictions, 5, "all removed entries count as evictions");
let query = Query::Get {
collection: "inv_all".to_string(),
key: Key::from_str("k:0"),
};
let _p = planner.plan(&query)?;
let stats = planner.cache_stats();
assert_eq!(stats.misses, 6, "re-plan after invalidation should miss");
Ok(())
}
#[test]
fn test_cache_invalidate_prefix() -> Result<()> {
let planner = QueryPlanner::new().with_cache(PlanCacheConfig::default());
for i in 0..3 {
let query = Query::Get {
collection: "orders".to_string(),
key: Key::from_str(&format!("o:{}", i)),
};
let _p = planner.plan(&query)?;
}
for i in 0..2 {
let query = Query::Get {
collection: "products".to_string(),
key: Key::from_str(&format!("p:{}", i)),
};
let _p = planner.plan(&query)?;
}
let stats = planner.cache_stats();
assert_eq!(stats.size, 5);
planner.invalidate_prefix("orders");
let stats = planner.cache_stats();
assert_eq!(stats.size, 2, "only products should remain");
assert_eq!(stats.evictions, 3, "3 orders entries evicted");
let query = Query::Get {
collection: "products".to_string(),
key: Key::from_str("p:0"),
};
let _p = planner.plan(&query)?;
let stats = planner.cache_stats();
assert_eq!(stats.hits, 1, "products entry should still be cached");
Ok(())
}
#[test]
fn test_cache_concurrent_access() -> Result<()> {
use std::sync::Arc;
let config = PlanCacheConfig {
max_entries: 100,
ttl: Duration::from_secs(300),
};
let planner = Arc::new(QueryPlanner::new().with_cache(config));
let mut handles = Vec::new();
for thread_id in 0..8 {
let planner_clone = Arc::clone(&planner);
let handle = std::thread::spawn(move || -> Result<()> {
for i in 0..10 {
let query = Query::Get {
collection: "concurrent".to_string(),
key: Key::from_str(&format!("k:{}:{}", thread_id % 2, i)),
};
let _plan = planner_clone.plan(&query)?;
}
Ok(())
});
handles.push(handle);
}
for handle in handles {
handle.join().expect("thread should not panic")?;
}
let stats = planner.cache_stats();
assert!(stats.size <= 20, "should have at most 20 entries");
let total = stats.hits + stats.misses;
assert_eq!(total, 80, "total ops should be 80");
assert!(
stats.hits > 0,
"should have some cache hits from concurrent access"
);
Ok(())
}
#[test]
fn test_cache_key_normalization() {
let key_a = CacheKey::normalize(" Filter { collection: \"x\" } ");
let key_b = CacheKey::normalize("Filter { collection: \"x\" }");
assert_eq!(key_a, key_b);
let normalized = CacheKey::normalize("FILTER { collection: \"x\" }");
assert!(normalized.starts_with("filter"));
}
#[test]
fn test_planner_without_cache() -> Result<()> {
use super::super::planner::PhysicalPlan;
let planner = QueryPlanner::new();
assert!(planner.plan_cache().is_none());
let query = Query::Get {
collection: "no_cache".to_string(),
key: Key::from_str("k1"),
};
let plan = planner.plan(&query)?;
assert!(matches!(plan, PhysicalPlan::PointGet { .. }));
let stats = planner.cache_stats();
assert_eq!(stats.hits, 0);
assert_eq!(stats.misses, 0);
planner.invalidate_all();
planner.invalidate_prefix("anything");
Ok(())
}
#[test]
fn test_cache_with_filter_queries() -> Result<()> {
use super::super::planner::PhysicalPlan;
use crate::types::{CipherBlob, Predicate, col};
let planner = QueryPlanner::new().with_cache(PlanCacheConfig::default());
let query = Query::Filter {
collection: "users".to_string(),
predicate: Predicate::Gt(col("age"), CipherBlob::new(vec![18])),
};
let plan1 = planner.plan(&query)?;
let stats = planner.cache_stats();
assert_eq!(stats.misses, 1);
let plan2 = planner.plan(&query)?;
let stats = planner.cache_stats();
assert_eq!(stats.hits, 1);
assert!(matches!(plan1, PhysicalPlan::FheFilter { .. }));
assert!(matches!(plan2, PhysicalPlan::FheFilter { .. }));
Ok(())
}
#[test]
fn test_plan_cache_debug() {
let cache = PlanCache::new(PlanCacheConfig::default());
let debug_str = format!("{:?}", cache);
assert!(debug_str.contains("PlanCache"));
assert!(debug_str.contains("max_entries"));
}
#[test]
fn test_cache_lru_order_updated_on_access() -> Result<()> {
let config = PlanCacheConfig {
max_entries: 3,
ttl: Duration::from_secs(300),
};
let planner = QueryPlanner::new().with_cache(config);
for i in 0..3 {
let query = Query::Get {
collection: "lru".to_string(),
key: Key::from_str(&format!("key:{}", i)),
};
let _p = planner.plan(&query)?;
}
let query_0 = Query::Get {
collection: "lru".to_string(),
key: Key::from_str("key:0"),
};
let _p = planner.plan(&query_0)?;
let query_3 = Query::Get {
collection: "lru".to_string(),
key: Key::from_str("key:3"),
};
let _p = planner.plan(&query_3)?;
let _p = planner.plan(&query_0)?;
let stats = planner.cache_stats();
assert!(
stats.hits >= 2,
"key:0 should still be in cache after LRU reorder"
);
let query_1 = Query::Get {
collection: "lru".to_string(),
key: Key::from_str("key:1"),
};
let _p = planner.plan(&query_1)?;
let stats = planner.cache_stats();
assert!(
stats.evictions >= 2,
"key:1 eviction + new eviction for reinsertion"
);
Ok(())
}
}