use crate::query::plan::ExecutionPlan;
use crate::OxirsError;
use lru::LruCache;
use scirs2_core::metrics::Counter;
use serde::{Deserialize, Serialize};
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
use std::num::NonZeroUsize;
use std::path::Path;
use std::sync::{Arc, RwLock};
use std::time::Instant;
pub struct LruQueryPlanCache {
cache: Arc<RwLock<LruCache<u64, CachedPlan>>>,
stats: Arc<RwLock<CacheStatistics>>,
hit_counter: Counter,
miss_counter: Counter,
eviction_counter: Counter,
config: CacheConfig,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CachedPlan {
pub plan: SerializablePlan,
pub signature: u64,
pub cached_at_ms: u128,
pub access_count: u64,
pub last_accessed_ms: u128,
pub estimated_cost: f64,
pub avg_execution_time_ms: f64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum SerializablePlan {
TripleScan { pattern_desc: String },
HashJoin {
left: Box<SerializablePlan>,
right: Box<SerializablePlan>,
join_vars: Vec<String>,
},
Filter {
input: Box<SerializablePlan>,
expr_desc: String,
},
Project {
input: Box<SerializablePlan>,
variables: Vec<String>,
},
Union {
left: Box<SerializablePlan>,
right: Box<SerializablePlan>,
},
Empty,
}
#[derive(Debug, Clone)]
pub struct CacheConfig {
pub max_size: usize,
pub enable_persistence: bool,
pub persistence_path: Option<String>,
pub ttl_ms: Option<u128>,
}
impl Default for CacheConfig {
fn default() -> Self {
Self {
max_size: 1000,
enable_persistence: false,
persistence_path: None,
ttl_ms: Some(3_600_000), }
}
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct CacheStatistics {
pub hits: u64,
pub misses: u64,
pub evictions: u64,
pub current_size: usize,
pub total_cached: u64,
}
impl LruQueryPlanCache {
pub fn new(config: CacheConfig) -> Self {
let capacity = NonZeroUsize::new(config.max_size)
.unwrap_or(NonZeroUsize::new(1000).expect("1000 is non-zero"));
Self {
cache: Arc::new(RwLock::new(LruCache::new(capacity))),
stats: Arc::new(RwLock::new(CacheStatistics::default())),
hit_counter: Counter::new("plan_cache.hits".to_string()),
miss_counter: Counter::new("plan_cache.misses".to_string()),
eviction_counter: Counter::new("plan_cache.evictions".to_string()),
config,
}
}
pub fn get(&self, query: &str) -> Option<CachedPlan> {
let signature = Self::compute_signature(query);
let start = Instant::now();
let result = {
let mut cache = self.cache.write().ok()?;
cache.get_mut(&signature).cloned()
};
let _elapsed = start.elapsed();
if let Some(mut plan) = result {
plan.access_count += 1;
plan.last_accessed_ms = Instant::now().elapsed().as_millis();
if let Some(ttl) = self.config.ttl_ms {
let age = plan.last_accessed_ms - plan.cached_at_ms;
if age > ttl {
self.remove(query);
self.record_miss();
return None;
}
}
if let Ok(mut cache) = self.cache.write() {
cache.put(signature, plan.clone());
}
self.record_hit();
Some(plan)
} else {
self.record_miss();
None
}
}
pub fn put(
&self,
query: &str,
plan: ExecutionPlan,
estimated_cost: f64,
) -> Result<(), OxirsError> {
let signature = Self::compute_signature(query);
let serializable = Self::convert_to_serializable(&plan);
let cached_plan = CachedPlan {
plan: serializable,
signature,
cached_at_ms: Instant::now().elapsed().as_millis(),
access_count: 0,
last_accessed_ms: Instant::now().elapsed().as_millis(),
estimated_cost,
avg_execution_time_ms: 0.0,
};
let mut cache = self
.cache
.write()
.map_err(|e| OxirsError::Query(format!("Failed to write cache: {}", e)))?;
let will_evict = cache.len() >= cache.cap().get();
if will_evict {
self.record_eviction();
}
cache.put(signature, cached_plan);
let mut stats = self
.stats
.write()
.map_err(|e| OxirsError::Query(format!("Failed to write stats: {}", e)))?;
stats.current_size = cache.len();
stats.total_cached += 1;
Ok(())
}
pub fn remove(&self, query: &str) -> Option<CachedPlan> {
let signature = Self::compute_signature(query);
self.cache.write().ok()?.pop(&signature)
}
pub fn clear(&self) -> Result<(), OxirsError> {
let mut cache = self
.cache
.write()
.map_err(|e| OxirsError::Query(format!("Failed to write cache: {}", e)))?;
cache.clear();
let mut stats = self
.stats
.write()
.map_err(|e| OxirsError::Query(format!("Failed to write stats: {}", e)))?;
stats.current_size = 0;
Ok(())
}
pub fn statistics(&self) -> CacheStatistics {
self.stats
.read()
.ok()
.map(|s| s.clone())
.unwrap_or_default()
}
pub fn hit_rate(&self) -> f64 {
let stats = self.statistics();
let total = stats.hits + stats.misses;
if total == 0 {
return 0.0;
}
stats.hits as f64 / total as f64
}
pub fn persist(&self) -> Result<(), OxirsError> {
if !self.config.enable_persistence {
return Ok(());
}
let path = self
.config
.persistence_path
.as_ref()
.ok_or_else(|| OxirsError::Io("No persistence path configured".to_string()))?;
let cache = self
.cache
.read()
.map_err(|e| OxirsError::Query(format!("Failed to read cache: {}", e)))?;
let entries: Vec<(u64, CachedPlan)> = cache.iter().map(|(k, v)| (*k, v.clone())).collect();
let json = serde_json::to_string_pretty(&entries)
.map_err(|e| OxirsError::Serialize(e.to_string()))?;
std::fs::write(path, json).map_err(|e| OxirsError::Io(e.to_string()))?;
tracing::info!("Persisted {} cached plans to {}", entries.len(), path);
Ok(())
}
pub fn load(&self) -> Result<(), OxirsError> {
if !self.config.enable_persistence {
return Ok(());
}
let path = self
.config
.persistence_path
.as_ref()
.ok_or_else(|| OxirsError::Io("No persistence path configured".to_string()))?;
if !Path::new(path).exists() {
return Ok(()); }
let json = std::fs::read_to_string(path).map_err(|e| OxirsError::Io(e.to_string()))?;
let entries: Vec<(u64, CachedPlan)> =
serde_json::from_str(&json).map_err(|e| OxirsError::Parse(e.to_string()))?;
let mut cache = self
.cache
.write()
.map_err(|e| OxirsError::Query(format!("Failed to write cache: {}", e)))?;
for (sig, plan) in entries {
cache.put(sig, plan);
}
tracing::info!("Loaded {} cached plans from {}", cache.len(), path);
Ok(())
}
pub fn update_execution_time(
&self,
query: &str,
execution_time_ms: f64,
) -> Result<(), OxirsError> {
let signature = Self::compute_signature(query);
let mut cache = self
.cache
.write()
.map_err(|e| OxirsError::Query(format!("Failed to write cache: {}", e)))?;
if let Some(plan) = cache.get_mut(&signature) {
let alpha = 0.3; if plan.avg_execution_time_ms == 0.0 {
plan.avg_execution_time_ms = execution_time_ms;
} else {
plan.avg_execution_time_ms =
alpha * execution_time_ms + (1.0 - alpha) * plan.avg_execution_time_ms;
}
}
Ok(())
}
fn compute_signature(query: &str) -> u64 {
let mut hasher = DefaultHasher::new();
query.hash(&mut hasher);
hasher.finish()
}
fn convert_to_serializable(plan: &ExecutionPlan) -> SerializablePlan {
match plan {
ExecutionPlan::TripleScan { pattern } => SerializablePlan::TripleScan {
pattern_desc: format!("{:?}", pattern),
},
ExecutionPlan::HashJoin {
left,
right,
join_vars,
} => SerializablePlan::HashJoin {
left: Box::new(Self::convert_to_serializable(left)),
right: Box::new(Self::convert_to_serializable(right)),
join_vars: join_vars.iter().map(|v| format!("{:?}", v)).collect(),
},
ExecutionPlan::Filter { input, condition } => SerializablePlan::Filter {
input: Box::new(Self::convert_to_serializable(input)),
expr_desc: format!("{:?}", condition),
},
ExecutionPlan::Project { input, vars } => SerializablePlan::Project {
input: Box::new(Self::convert_to_serializable(input)),
variables: vars.iter().map(|v| format!("{:?}", v)).collect(),
},
ExecutionPlan::Union { left, right } => SerializablePlan::Union {
left: Box::new(Self::convert_to_serializable(left)),
right: Box::new(Self::convert_to_serializable(right)),
},
_ => SerializablePlan::Empty,
}
}
fn record_hit(&self) {
self.hit_counter.add(1);
if let Ok(mut stats) = self.stats.write() {
stats.hits += 1;
}
}
fn record_miss(&self) {
self.miss_counter.add(1);
if let Ok(mut stats) = self.stats.write() {
stats.misses += 1;
}
}
fn record_eviction(&self) {
self.eviction_counter.add(1);
if let Ok(mut stats) = self.stats.write() {
stats.evictions += 1;
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_cache_creation() {
let config = CacheConfig::default();
let cache = LruQueryPlanCache::new(config);
let stats = cache.statistics();
assert_eq!(stats.hits, 0);
assert_eq!(stats.misses, 0);
}
#[test]
fn test_cache_put_get() {
let config = CacheConfig::default();
let cache = LruQueryPlanCache::new(config);
let query = "SELECT ?s ?p ?o WHERE { ?s ?p ?o }";
let plan = ExecutionPlan::TripleScan {
pattern: crate::model::pattern::TriplePattern::new(None, None, None),
};
cache
.put(query, plan, 100.0)
.expect("cache put should succeed");
let cached = cache.get(query);
assert!(cached.is_some());
assert_eq!(
cached.expect("cached value should exist").estimated_cost,
100.0
);
}
#[test]
fn test_cache_miss() {
let config = CacheConfig::default();
let cache = LruQueryPlanCache::new(config);
let result = cache.get("SELECT ?s WHERE { ?s ?p ?o }");
assert!(result.is_none());
let stats = cache.statistics();
assert_eq!(stats.misses, 1);
}
#[test]
fn test_cache_remove() {
let config = CacheConfig::default();
let cache = LruQueryPlanCache::new(config);
let query = "SELECT ?s WHERE { ?s ?p ?o }";
let plan = ExecutionPlan::TripleScan {
pattern: crate::model::pattern::TriplePattern::new(None, None, None),
};
cache
.put(query, plan, 50.0)
.expect("cache put should succeed");
assert!(cache.get(query).is_some());
cache.remove(query);
assert!(cache.get(query).is_none());
}
#[test]
fn test_cache_clear() {
let config = CacheConfig::default();
let cache = LruQueryPlanCache::new(config);
let plan = ExecutionPlan::TripleScan {
pattern: crate::model::pattern::TriplePattern::new(None, None, None),
};
cache
.put("query1", plan.clone(), 50.0)
.expect("cache put should succeed");
cache
.put("query2", plan, 75.0)
.expect("cache put should succeed");
cache.clear().expect("cache clear should succeed");
let stats = cache.statistics();
assert_eq!(stats.current_size, 0);
}
#[test]
fn test_hit_rate() {
let config = CacheConfig::default();
let cache = LruQueryPlanCache::new(config);
let plan = ExecutionPlan::TripleScan {
pattern: crate::model::pattern::TriplePattern::new(None, None, None),
};
let query = "SELECT * WHERE { ?s ?p ?o }";
cache
.put(query, plan, 100.0)
.expect("cache put should succeed");
cache.get(query);
cache.get("SELECT * WHERE { ?x ?y ?z }");
let hit_rate = cache.hit_rate();
assert!((hit_rate - 0.5).abs() < 0.01); }
#[test]
fn test_lru_eviction() {
let config = CacheConfig {
max_size: 2, ..Default::default()
};
let cache = LruQueryPlanCache::new(config);
let plan = ExecutionPlan::TripleScan {
pattern: crate::model::pattern::TriplePattern::new(None, None, None),
};
cache
.put("query1", plan.clone(), 10.0)
.expect("cache put should succeed");
cache
.put("query2", plan.clone(), 20.0)
.expect("cache put should succeed");
cache
.put("query3", plan, 30.0)
.expect("cache put should succeed");
assert!(cache.get("query1").is_none()); assert!(cache.get("query2").is_some());
assert!(cache.get("query3").is_some());
}
#[test]
fn test_execution_time_update() {
let config = CacheConfig::default();
let cache = LruQueryPlanCache::new(config);
let query = "SELECT ?s WHERE { ?s ?p ?o }";
let plan = ExecutionPlan::TripleScan {
pattern: crate::model::pattern::TriplePattern::new(None, None, None),
};
cache
.put(query, plan, 100.0)
.expect("cache put should succeed");
cache
.update_execution_time(query, 50.0)
.expect("update should succeed");
let cached = cache.get(query).expect("cache get should succeed");
assert_eq!(cached.avg_execution_time_ms, 50.0);
cache
.update_execution_time(query, 70.0)
.expect("update should succeed");
let cached2 = cache.get(query).expect("cache get should succeed");
assert!(cached2.avg_execution_time_ms > 50.0 && cached2.avg_execution_time_ms < 70.0);
}
}
use std::collections::HashMap;
#[derive(Debug, Clone)]
pub struct QueryPlan {
pub query_hash: u64,
pub original_query: String,
pub optimized_patterns: Vec<String>,
pub estimated_cost: f64,
pub created_at_ms: i64,
}
impl QueryPlan {
fn now_ms() -> i64 {
use std::time::{SystemTime, UNIX_EPOCH};
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_millis() as i64)
.unwrap_or(0)
}
pub fn new(
query_hash: u64,
original_query: impl Into<String>,
optimized_patterns: Vec<String>,
estimated_cost: f64,
) -> Self {
Self {
query_hash,
original_query: original_query.into(),
optimized_patterns,
estimated_cost,
created_at_ms: Self::now_ms(),
}
}
}
#[derive(Debug, Clone, Default)]
pub struct PlanCacheStats {
pub hits: u64,
pub misses: u64,
pub evictions: u64,
pub size: usize,
}
pub struct QueryPlanCache {
plans: HashMap<u64, QueryPlan>,
access_order: Vec<u64>,
max_size: usize,
stats: PlanCacheStats,
}
impl QueryPlanCache {
pub fn new(max_size: usize) -> Self {
let max_size = max_size.max(1);
Self {
plans: HashMap::new(),
access_order: Vec::new(),
max_size,
stats: PlanCacheStats::default(),
}
}
pub fn get(&mut self, query_hash: u64) -> Option<&QueryPlan> {
if self.plans.contains_key(&query_hash) {
if let Some(pos) = self.access_order.iter().position(|&h| h == query_hash) {
self.access_order.remove(pos);
}
self.access_order.insert(0, query_hash);
self.stats.hits += 1;
self.plans.get(&query_hash)
} else {
self.stats.misses += 1;
None
}
}
pub fn insert(&mut self, plan: QueryPlan) {
let hash = plan.query_hash;
if self.plans.contains_key(&hash) {
if let Some(pos) = self.access_order.iter().position(|&h| h == hash) {
self.access_order.remove(pos);
}
} else if self.plans.len() >= self.max_size {
self.evict_lru();
}
self.plans.insert(hash, plan);
self.access_order.insert(0, hash);
self.stats.size = self.plans.len();
}
pub fn evict_lru(&mut self) -> Option<QueryPlan> {
let lru_hash = self.access_order.pop()?;
let plan = self.plans.remove(&lru_hash);
self.stats.evictions += 1;
self.stats.size = self.plans.len();
plan
}
pub fn invalidate(&mut self, query_hash: u64) -> bool {
let existed = self.plans.remove(&query_hash).is_some();
if existed {
if let Some(pos) = self.access_order.iter().position(|&h| h == query_hash) {
self.access_order.remove(pos);
}
self.stats.size = self.plans.len();
}
existed
}
pub fn clear(&mut self) {
self.plans.clear();
self.access_order.clear();
self.stats.size = 0;
}
pub fn stats(&self) -> &PlanCacheStats {
&self.stats
}
pub fn hit_rate(&self) -> f64 {
let total = self.stats.hits + self.stats.misses;
if total == 0 {
0.0
} else {
self.stats.hits as f64 / total as f64
}
}
pub fn len(&self) -> usize {
self.plans.len()
}
pub fn is_empty(&self) -> bool {
self.plans.is_empty()
}
}
#[cfg(test)]
mod simple_cache_tests {
use super::*;
fn make_plan(hash: u64, query: &str, cost: f64) -> QueryPlan {
QueryPlan::new(hash, query, vec!["scan".to_string()], cost)
}
#[test]
fn test_simple_cache_new_is_empty() {
let cache = QueryPlanCache::new(10);
assert!(cache.is_empty());
assert_eq!(cache.len(), 0);
}
#[test]
fn test_simple_cache_insert_and_get_hit() {
let mut cache = QueryPlanCache::new(10);
cache.insert(make_plan(1, "SELECT ?s ?p ?o WHERE {?s ?p ?o}", 100.0));
let plan = cache.get(1);
assert!(plan.is_some());
assert_eq!(plan.expect("plan should exist").query_hash, 1);
}
#[test]
fn test_simple_cache_miss_increments_stat() {
let mut cache = QueryPlanCache::new(10);
assert!(cache.get(42).is_none());
assert_eq!(cache.stats().misses, 1);
assert_eq!(cache.stats().hits, 0);
}
#[test]
fn test_simple_cache_hit_increments_stat() {
let mut cache = QueryPlanCache::new(10);
cache.insert(make_plan(7, "SELECT * WHERE {?s ?p ?o}", 50.0));
cache.get(7);
assert_eq!(cache.stats().hits, 1);
assert_eq!(cache.stats().misses, 0);
}
#[test]
fn test_simple_cache_hit_rate_zero_initially() {
let cache = QueryPlanCache::new(5);
assert_eq!(cache.hit_rate(), 0.0);
}
#[test]
fn test_simple_cache_hit_rate_after_ops() {
let mut cache = QueryPlanCache::new(5);
cache.insert(make_plan(1, "q1", 10.0));
cache.get(1); cache.get(2); let rate = cache.hit_rate();
assert!((rate - 0.5).abs() < 1e-9);
}
#[test]
fn test_simple_cache_evict_lru_on_full() {
let mut cache = QueryPlanCache::new(2);
cache.insert(make_plan(1, "q1", 1.0)); cache.insert(make_plan(2, "q2", 2.0)); cache.insert(make_plan(3, "q3", 3.0));
assert_eq!(cache.len(), 2);
assert!(cache.get(1).is_none(), "q1 should have been evicted");
assert!(cache.get(2).is_some());
assert!(cache.get(3).is_some());
}
#[test]
fn test_simple_cache_evict_lru_access_order() {
let mut cache = QueryPlanCache::new(2);
cache.insert(make_plan(1, "q1", 1.0));
cache.insert(make_plan(2, "q2", 2.0));
cache.get(1);
cache.insert(make_plan(3, "q3", 3.0));
assert!(cache.get(2).is_none(), "q2 should have been evicted");
assert!(cache.get(1).is_some());
assert!(cache.get(3).is_some());
}
#[test]
fn test_simple_cache_evict_lru_explicit() {
let mut cache = QueryPlanCache::new(5);
cache.insert(make_plan(1, "q1", 1.0));
cache.insert(make_plan(2, "q2", 2.0));
let evicted = cache.evict_lru();
assert!(evicted.is_some());
assert_eq!(cache.stats().evictions, 1);
assert_eq!(cache.len(), 1);
}
#[test]
fn test_simple_cache_evict_lru_empty_returns_none() {
let mut cache = QueryPlanCache::new(5);
assert!(cache.evict_lru().is_none());
}
#[test]
fn test_simple_cache_invalidate_existing() {
let mut cache = QueryPlanCache::new(5);
cache.insert(make_plan(99, "q99", 9.0));
let removed = cache.invalidate(99);
assert!(removed);
assert!(cache.is_empty());
}
#[test]
fn test_simple_cache_invalidate_missing_returns_false() {
let mut cache = QueryPlanCache::new(5);
assert!(!cache.invalidate(404));
}
#[test]
fn test_simple_cache_clear() {
let mut cache = QueryPlanCache::new(5);
cache.insert(make_plan(1, "q1", 1.0));
cache.insert(make_plan(2, "q2", 2.0));
cache.clear();
assert!(cache.is_empty());
assert_eq!(cache.stats().size, 0);
}
#[test]
fn test_simple_cache_plan_fields() {
let plan = QueryPlan::new(
42,
"SELECT * WHERE {?s ?p ?o}",
vec!["index_scan".to_string()],
2.5,
);
assert_eq!(plan.query_hash, 42);
assert_eq!(plan.optimized_patterns, vec!["index_scan".to_string()]);
assert!((plan.estimated_cost - 2.5).abs() < 1e-9);
assert!(plan.created_at_ms >= 0);
}
#[test]
fn test_simple_cache_stats_size_tracks_inserts() {
let mut cache = QueryPlanCache::new(10);
cache.insert(make_plan(1, "q1", 1.0));
cache.insert(make_plan(2, "q2", 2.0));
assert_eq!(cache.stats().size, 2);
}
#[test]
fn test_simple_cache_reinsertion_updates_access_order() {
let mut cache = QueryPlanCache::new(3);
cache.insert(make_plan(1, "q1", 1.0));
cache.insert(make_plan(2, "q2", 2.0));
cache.insert(make_plan(1, "q1", 1.5)); cache.insert(make_plan(3, "q3", 3.0));
assert_eq!(cache.len(), 3); }
}