use crate::{
algebra::{Algebra, Solution, Term, Variable},
query::Query,
Result,
};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::hash::{Hash, Hasher};
use std::sync::Arc;
use std::time::Duration;
pub trait CacheKey: Clone + std::hash::Hash + Eq + Send + Sync {}
pub trait CacheValue: Clone + Send + Sync {}
impl CacheKey for String {}
impl CacheValue for StatisticsSnapshot {}
#[derive(Debug)]
pub struct AdvancedCache<K, V> {
_phantom: std::marker::PhantomData<(K, V)>,
}
impl<K: CacheKey, V: CacheValue> AdvancedCache<K, V> {
pub fn new(_config: AdvancedCacheConfig) -> Self {
Self {
_phantom: std::marker::PhantomData,
}
}
pub fn get(&self, _key: &K) -> Option<V> {
None
}
pub fn put(&self, _key: K, _value: V) -> Result<()> {
Ok(())
}
pub fn warm_cache(&self) -> Result<()> {
Ok(())
}
pub fn clear(&self) {
}
}
#[derive(Debug, Clone)]
pub struct AdvancedCacheConfig {
pub l1_cache_size: usize,
pub l2_cache_size: usize,
pub l3_cache_size: usize,
pub enable_compression: bool,
}
impl Default for AdvancedCacheConfig {
fn default() -> Self {
Self {
l1_cache_size: 1024,
l2_cache_size: 4096,
l3_cache_size: 16384,
enable_compression: false,
}
}
}
#[derive(Debug, Clone)]
pub struct ArqCacheConfig {
pub query_plan_cache: AdvancedCacheConfig,
pub query_result_cache: AdvancedCacheConfig,
pub bgp_cache: AdvancedCacheConfig,
pub statistics_cache: AdvancedCacheConfig,
pub enable_cross_query_caching: bool,
pub max_result_size: usize,
pub query_similarity_threshold: f64,
}
impl Default for ArqCacheConfig {
fn default() -> Self {
let base_config = AdvancedCacheConfig {
l1_cache_size: 5000, l2_cache_size: 20000,
l3_cache_size: 100000,
..Default::default()
};
let result_config = AdvancedCacheConfig {
l1_cache_size: 1000, l2_cache_size: 5000,
l3_cache_size: 20000,
enable_compression: true,
};
Self {
query_plan_cache: base_config.clone(),
query_result_cache: result_config,
bgp_cache: base_config.clone(),
statistics_cache: base_config,
enable_cross_query_caching: true,
max_result_size: 10 * 1024 * 1024, query_similarity_threshold: 0.8,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CachedQueryPlan {
pub algebra: Algebra,
pub estimated_cost: f64,
pub optimization_metadata: OptimizationMetadata,
pub statistics_snapshot: StatisticsSnapshot,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OptimizationMetadata {
pub optimizations_applied: Vec<String>,
pub optimization_time_ms: u64,
pub selectivity_estimates: HashMap<String, f64>,
pub join_order: Vec<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StatisticsSnapshot {
pub dataset_size: usize,
pub predicate_cardinalities: HashMap<String, usize>,
pub timestamp: u64,
pub version: String,
}
#[derive(Debug, Clone)]
pub struct CachedQueryResult {
pub solutions: Vec<Solution>,
pub metadata: QueryResultMetadata,
pub size_bytes: usize,
}
#[derive(Debug, Clone)]
pub struct QueryResultMetadata {
pub execution_time: Duration,
pub dataset_version: String,
pub variables: Vec<Variable>,
pub solution_count: usize,
pub is_complete: bool,
}
#[derive(Debug, Clone, Hash, PartialEq, Eq)]
pub struct QueryPlanCacheKey {
pub query_hash: u64,
pub schema_hash: u64,
pub optimization_level: OptimizationLevel,
pub config_hash: u64,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct QueryResultCacheKey {
pub query_signature: QuerySignature,
pub dataset_version: String,
pub parameter_bindings: HashMap<String, String>,
}
impl std::hash::Hash for QueryResultCacheKey {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.query_signature.hash(state);
self.dataset_version.hash(state);
let mut sorted_params: Vec<_> = self.parameter_bindings.iter().collect();
sorted_params.sort_by_key(|(k, _)| *k);
for (k, v) in sorted_params {
k.hash(state);
v.hash(state);
}
}
}
#[derive(Debug, Clone, Hash, PartialEq, Eq)]
pub struct QuerySignature {
pub canonical_form: String,
pub variables: Vec<String>,
pub operation_type: QueryOperationType,
pub complexity_score: u32,
}
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
pub enum QueryOperationType {
Select,
Construct,
Ask,
Describe,
Update,
}
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
pub enum OptimizationLevel {
Basic,
Standard,
Aggressive,
Custom(u32),
}
#[derive(Debug, Clone, Hash, PartialEq, Eq)]
pub struct BgpCacheKey {
pub pattern_hash: u64,
pub bindings_hash: u64,
pub graph_context: Option<String>,
}
#[derive(Debug, Clone)]
pub struct CachedBgpResult {
pub solutions: Vec<Solution>,
pub metadata: BgpEvaluationMetadata,
}
#[derive(Debug, Clone)]
pub struct BgpEvaluationMetadata {
pub evaluation_time: Duration,
pub solution_count: usize,
pub index_hits: usize,
pub selectivity: f64,
}
pub struct ArqCacheManager {
query_plan_cache: Arc<AdvancedCache<QueryPlanCacheKey, CachedQueryPlan>>,
query_result_cache: Arc<AdvancedCache<QueryResultCacheKey, CachedQueryResult>>,
bgp_cache: Arc<AdvancedCache<BgpCacheKey, CachedBgpResult>>,
statistics_cache: Arc<AdvancedCache<String, StatisticsSnapshot>>,
config: ArqCacheConfig,
cache_stats: Arc<std::sync::RwLock<ArqCacheStatistics>>,
}
#[derive(Debug, Clone, Default)]
pub struct ArqCacheStatistics {
pub plan_cache_hits: usize,
pub plan_cache_misses: usize,
pub result_cache_hits: usize,
pub result_cache_misses: usize,
pub bgp_cache_hits: usize,
pub bgp_cache_misses: usize,
pub time_saved_ms: u64,
pub avg_lookup_time_us: f64,
pub efficiency_score: f64,
}
impl ArqCacheManager {
pub fn new(config: ArqCacheConfig) -> Self {
Self {
query_plan_cache: Arc::new(AdvancedCache::new(config.query_plan_cache.clone())),
query_result_cache: Arc::new(AdvancedCache::new(config.query_result_cache.clone())),
bgp_cache: Arc::new(AdvancedCache::new(config.bgp_cache.clone())),
statistics_cache: Arc::new(AdvancedCache::new(config.statistics_cache.clone())),
config,
cache_stats: Arc::new(std::sync::RwLock::new(ArqCacheStatistics::default())),
}
}
pub fn get_query_plan(&self, key: &QueryPlanCacheKey) -> Option<CachedQueryPlan> {
let start_time = std::time::Instant::now();
let result = self.query_plan_cache.get(key);
{
let mut stats = self.cache_stats.write().expect("lock poisoned");
if result.is_some() {
stats.plan_cache_hits += 1;
} else {
stats.plan_cache_misses += 1;
}
self.update_avg_lookup_time(&mut stats, start_time.elapsed());
}
result
}
pub fn cache_query_plan(&self, key: QueryPlanCacheKey, plan: CachedQueryPlan) -> Result<()> {
self.query_plan_cache.put(key, plan)?;
Ok(())
}
pub fn get_query_result(&self, key: &QueryResultCacheKey) -> Option<CachedQueryResult> {
let start_time = std::time::Instant::now();
if !self.is_result_cache_valid(key) {
return None;
}
let result = self.query_result_cache.get(key);
{
let mut stats = self.cache_stats.write().expect("lock poisoned");
if let Some(ref cached_result) = result {
stats.result_cache_hits += 1;
stats.time_saved_ms += cached_result.metadata.execution_time.as_millis() as u64;
} else {
stats.result_cache_misses += 1;
}
self.update_avg_lookup_time(&mut stats, start_time.elapsed());
}
result
}
pub fn cache_query_result(
&self,
key: QueryResultCacheKey,
result: CachedQueryResult,
) -> Result<()> {
if result.size_bytes > self.config.max_result_size {
return Ok(()); }
self.query_result_cache.put(key, result)?;
Ok(())
}
pub fn get_bgp_result(&self, key: &BgpCacheKey) -> Option<CachedBgpResult> {
let start_time = std::time::Instant::now();
let result = self.bgp_cache.get(key);
{
let mut stats = self.cache_stats.write().expect("lock poisoned");
if result.is_some() {
stats.bgp_cache_hits += 1;
} else {
stats.bgp_cache_misses += 1;
}
self.update_avg_lookup_time(&mut stats, start_time.elapsed());
}
result
}
pub fn cache_bgp_result(&self, key: BgpCacheKey, result: CachedBgpResult) -> Result<()> {
self.bgp_cache.put(key, result)?;
Ok(())
}
pub fn create_plan_cache_key(
&self,
query: &Query,
schema_hash: u64,
optimization_level: OptimizationLevel,
) -> QueryPlanCacheKey {
let query_hash = self.hash_query(query);
let config_hash = self.hash_config();
QueryPlanCacheKey {
query_hash,
schema_hash,
optimization_level,
config_hash,
}
}
pub fn create_result_cache_key(
&self,
query: &Query,
dataset_version: String,
parameter_bindings: HashMap<String, String>,
) -> QueryResultCacheKey {
let query_signature = self.create_query_signature(query);
QueryResultCacheKey {
query_signature,
dataset_version,
parameter_bindings,
}
}
pub fn create_bgp_cache_key(
&self,
pattern_hash: u64,
bindings: &HashMap<Variable, Term>,
graph_context: Option<&str>,
) -> BgpCacheKey {
let bindings_hash = self.hash_bindings(bindings);
BgpCacheKey {
pattern_hash,
bindings_hash,
graph_context: graph_context.map(|s| s.to_string()),
}
}
pub fn get_statistics(&self) -> ArqCacheStatistics {
let stats = self.cache_stats.read().expect("lock poisoned");
stats.clone()
}
pub fn warm_caches(&self) -> Result<()> {
self.query_plan_cache.warm_cache()?;
self.query_result_cache.warm_cache()?;
self.bgp_cache.warm_cache()?;
Ok(())
}
pub fn clear_all_caches(&self) {
self.query_plan_cache.clear();
self.query_result_cache.clear();
self.bgp_cache.clear();
self.statistics_cache.clear();
{
let mut stats = self.cache_stats.write().expect("lock poisoned");
*stats = ArqCacheStatistics::default();
}
}
pub fn invalidate_on_dataset_change(&self, _changed_predicates: &[String]) -> Result<()> {
self.clear_all_caches();
Ok(())
}
fn hash_query(&self, query: &Query) -> u64 {
let mut hasher = std::collections::hash_map::DefaultHasher::new();
format!("{query:?}").hash(&mut hasher);
hasher.finish()
}
fn hash_config(&self) -> u64 {
let mut hasher = std::collections::hash_map::DefaultHasher::new();
self.config
.query_similarity_threshold
.to_bits()
.hash(&mut hasher);
self.config.enable_cross_query_caching.hash(&mut hasher);
hasher.finish()
}
fn hash_bindings(&self, bindings: &HashMap<Variable, Term>) -> u64 {
let mut hasher = std::collections::hash_map::DefaultHasher::new();
let mut sorted_bindings: Vec<_> = bindings.iter().collect();
sorted_bindings.sort_by_key(|(var, _)| var.as_str());
for (var, term) in sorted_bindings {
var.hash(&mut hasher);
format!("{term:?}").hash(&mut hasher);
}
hasher.finish()
}
fn create_query_signature(&self, query: &Query) -> QuerySignature {
QuerySignature {
canonical_form: format!("{query:?}"), variables: query
.select_variables
.iter()
.map(|v| v.as_str().to_string())
.collect(),
operation_type: self.determine_operation_type(query),
complexity_score: self.calculate_complexity_score(query),
}
}
fn determine_operation_type(&self, _query: &Query) -> QueryOperationType {
QueryOperationType::Select }
fn calculate_complexity_score(&self, _query: &Query) -> u32 {
100 }
fn is_result_cache_valid(&self, _key: &QueryResultCacheKey) -> bool {
true }
fn update_avg_lookup_time(&self, stats: &mut ArqCacheStatistics, lookup_time: Duration) {
let total_lookups = stats.plan_cache_hits
+ stats.plan_cache_misses
+ stats.result_cache_hits
+ stats.result_cache_misses
+ stats.bgp_cache_hits
+ stats.bgp_cache_misses;
let lookup_time_us = lookup_time.as_micros() as f64;
if total_lookups == 1 {
stats.avg_lookup_time_us = lookup_time_us;
} else {
stats.avg_lookup_time_us = (stats.avg_lookup_time_us * (total_lookups - 1) as f64
+ lookup_time_us)
/ total_lookups as f64;
}
let hit_rate = (stats.plan_cache_hits + stats.result_cache_hits + stats.bgp_cache_hits)
as f64
/ total_lookups.max(1) as f64;
stats.efficiency_score =
hit_rate * 0.7 + (1.0 - stats.avg_lookup_time_us / 1000.0).max(0.0) * 0.3;
}
}
impl CacheKey for QueryPlanCacheKey {}
impl CacheValue for CachedQueryPlan {}
impl CacheKey for QueryResultCacheKey {}
impl CacheValue for CachedQueryResult {}
impl CacheKey for BgpCacheKey {}
impl CacheValue for CachedBgpResult {}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_arq_cache_manager_creation() {
let config = ArqCacheConfig::default();
let cache_manager = ArqCacheManager::new(config);
let stats = cache_manager.get_statistics();
assert_eq!(stats.plan_cache_hits, 0);
assert_eq!(stats.result_cache_hits, 0);
}
#[test]
fn test_cache_key_creation() {
let config = ArqCacheConfig::default();
let _cache_manager = ArqCacheManager::new(config);
}
}