use parking_lot::Mutex;
use std::collections::HashMap;
use std::hash::Hash;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Instant;
use crate::query::plan::LogicalPlan;
use crate::query::processor::QueryLanguage;
#[derive(Clone, Eq, PartialEq, Hash)]
pub struct CacheKey {
query: String,
language: QueryLanguage,
graph: Option<String>,
}
impl CacheKey {
#[must_use]
pub fn new(query: impl Into<String>, language: QueryLanguage) -> Self {
Self {
query: normalize_query(&query.into()),
language,
graph: None,
}
}
#[must_use]
pub fn with_graph(
query: impl Into<String>,
language: QueryLanguage,
graph: Option<String>,
) -> Self {
Self {
query: normalize_query(&query.into()),
language,
graph,
}
}
#[must_use]
pub fn query(&self) -> &str {
&self.query
}
#[must_use]
pub fn language(&self) -> QueryLanguage {
self.language
}
}
fn normalize_query(query: &str) -> String {
query.split_whitespace().collect::<Vec<_>>().join(" ")
}
struct CacheEntry<T> {
value: T,
access_count: u64,
#[cfg(not(target_arch = "wasm32"))]
last_accessed: Instant,
}
impl<T: Clone> CacheEntry<T> {
fn new(value: T) -> Self {
Self {
value,
access_count: 0,
#[cfg(not(target_arch = "wasm32"))]
last_accessed: Instant::now(),
}
}
fn access(&mut self) -> T {
self.access_count += 1;
#[cfg(not(target_arch = "wasm32"))]
{
self.last_accessed = Instant::now();
}
self.value.clone()
}
}
struct LruCache<K, V> {
entries: HashMap<K, CacheEntry<V>>,
capacity: usize,
access_order: Vec<K>,
}
impl<K: Clone + Eq + Hash, V: Clone> LruCache<K, V> {
fn new(capacity: usize) -> Self {
Self {
entries: HashMap::with_capacity(capacity),
capacity,
access_order: Vec::with_capacity(capacity),
}
}
fn get(&mut self, key: &K) -> Option<V> {
if let Some(entry) = self.entries.get_mut(key) {
if let Some(pos) = self.access_order.iter().position(|k| k == key) {
self.access_order.remove(pos);
}
self.access_order.push(key.clone());
Some(entry.access())
} else {
None
}
}
fn put(&mut self, key: K, value: V) {
if self.entries.len() >= self.capacity && !self.entries.contains_key(&key) {
self.evict_lru();
}
if let Some(pos) = self.access_order.iter().position(|k| k == &key) {
self.access_order.remove(pos);
}
self.access_order.push(key.clone());
self.entries.insert(key, CacheEntry::new(value));
}
fn evict_lru(&mut self) {
if let Some(key) = self.access_order.first().cloned() {
self.access_order.remove(0);
self.entries.remove(&key);
}
}
fn clear(&mut self) {
self.entries.clear();
self.access_order.clear();
}
fn len(&self) -> usize {
self.entries.len()
}
fn remove(&mut self, key: &K) -> Option<V> {
if let Some(pos) = self.access_order.iter().position(|k| k == key) {
self.access_order.remove(pos);
}
self.entries.remove(key).map(|e| e.value)
}
fn heap_memory_bytes(&self) -> usize {
let entry_size = std::mem::size_of::<K>() + std::mem::size_of::<CacheEntry<V>>() + 1;
let map_bytes = self.entries.capacity() * entry_size;
let vec_bytes = self.access_order.capacity() * std::mem::size_of::<K>();
map_bytes + vec_bytes
}
}
pub struct QueryCache {
parsed_cache: Mutex<LruCache<CacheKey, LogicalPlan>>,
optimized_cache: Mutex<LruCache<CacheKey, LogicalPlan>>,
parsed_hits: AtomicU64,
parsed_misses: AtomicU64,
optimized_hits: AtomicU64,
optimized_misses: AtomicU64,
invalidations: AtomicU64,
enabled: bool,
}
impl QueryCache {
#[must_use]
pub fn new(capacity: usize) -> Self {
let half_capacity = capacity / 2;
Self {
parsed_cache: Mutex::new(LruCache::new(half_capacity.max(1))),
optimized_cache: Mutex::new(LruCache::new(half_capacity.max(1))),
parsed_hits: AtomicU64::new(0),
parsed_misses: AtomicU64::new(0),
optimized_hits: AtomicU64::new(0),
optimized_misses: AtomicU64::new(0),
invalidations: AtomicU64::new(0),
enabled: true,
}
}
#[must_use]
pub fn disabled() -> Self {
Self {
parsed_cache: Mutex::new(LruCache::new(0)),
optimized_cache: Mutex::new(LruCache::new(0)),
parsed_hits: AtomicU64::new(0),
parsed_misses: AtomicU64::new(0),
optimized_hits: AtomicU64::new(0),
optimized_misses: AtomicU64::new(0),
invalidations: AtomicU64::new(0),
enabled: false,
}
}
#[must_use]
pub fn is_enabled(&self) -> bool {
self.enabled
}
pub fn get_parsed(&self, key: &CacheKey) -> Option<LogicalPlan> {
if !self.enabled {
return None;
}
let result = self.parsed_cache.lock().get(key);
if result.is_some() {
self.parsed_hits.fetch_add(1, Ordering::Relaxed);
} else {
self.parsed_misses.fetch_add(1, Ordering::Relaxed);
}
result
}
pub fn put_parsed(&self, key: CacheKey, plan: LogicalPlan) {
if !self.enabled {
return;
}
self.parsed_cache.lock().put(key, plan);
}
pub fn get_optimized(&self, key: &CacheKey) -> Option<LogicalPlan> {
if !self.enabled {
return None;
}
let result = self.optimized_cache.lock().get(key);
if result.is_some() {
self.optimized_hits.fetch_add(1, Ordering::Relaxed);
} else {
self.optimized_misses.fetch_add(1, Ordering::Relaxed);
}
result
}
pub fn put_optimized(&self, key: CacheKey, plan: LogicalPlan) {
if !self.enabled {
return;
}
self.optimized_cache.lock().put(key, plan);
}
pub fn invalidate(&self, key: &CacheKey) {
self.parsed_cache.lock().remove(key);
self.optimized_cache.lock().remove(key);
}
pub fn clear(&self) {
let had_entries =
self.parsed_cache.lock().len() > 0 || self.optimized_cache.lock().len() > 0;
self.parsed_cache.lock().clear();
self.optimized_cache.lock().clear();
if had_entries {
self.invalidations.fetch_add(1, Ordering::Relaxed);
}
}
#[must_use]
pub fn stats(&self) -> CacheStats {
CacheStats {
parsed_size: self.parsed_cache.lock().len(),
optimized_size: self.optimized_cache.lock().len(),
parsed_hits: self.parsed_hits.load(Ordering::Relaxed),
parsed_misses: self.parsed_misses.load(Ordering::Relaxed),
optimized_hits: self.optimized_hits.load(Ordering::Relaxed),
optimized_misses: self.optimized_misses.load(Ordering::Relaxed),
invalidations: self.invalidations.load(Ordering::Relaxed),
}
}
#[must_use]
pub fn heap_memory_bytes(&self) -> (usize, usize, usize) {
let parsed = self.parsed_cache.lock();
let optimized = self.optimized_cache.lock();
let parsed_bytes = parsed.heap_memory_bytes();
let optimized_bytes = optimized.heap_memory_bytes();
let count = parsed.len() + optimized.len();
(parsed_bytes, optimized_bytes, count)
}
pub fn reset_stats(&self) {
self.parsed_hits.store(0, Ordering::Relaxed);
self.parsed_misses.store(0, Ordering::Relaxed);
self.optimized_hits.store(0, Ordering::Relaxed);
self.optimized_misses.store(0, Ordering::Relaxed);
self.invalidations.store(0, Ordering::Relaxed);
}
}
impl Default for QueryCache {
fn default() -> Self {
Self::new(1000)
}
}
#[derive(Debug, Clone)]
pub struct CacheStats {
pub parsed_size: usize,
pub optimized_size: usize,
pub parsed_hits: u64,
pub parsed_misses: u64,
pub optimized_hits: u64,
pub optimized_misses: u64,
pub invalidations: u64,
}
impl CacheStats {
#[must_use]
pub fn parsed_hit_rate(&self) -> f64 {
let total = self.parsed_hits + self.parsed_misses;
if total == 0 {
0.0
} else {
self.parsed_hits as f64 / total as f64
}
}
#[must_use]
pub fn optimized_hit_rate(&self) -> f64 {
let total = self.optimized_hits + self.optimized_misses;
if total == 0 {
0.0
} else {
self.optimized_hits as f64 / total as f64
}
}
#[must_use]
pub fn total_size(&self) -> usize {
self.parsed_size + self.optimized_size
}
#[must_use]
pub fn total_hit_rate(&self) -> f64 {
let total_hits = self.parsed_hits + self.optimized_hits;
let total_misses = self.parsed_misses + self.optimized_misses;
let total = total_hits + total_misses;
if total == 0 {
0.0
} else {
total_hits as f64 / total as f64
}
}
}
pub struct CachingQueryProcessor<P> {
processor: P,
cache: QueryCache,
}
impl<P> CachingQueryProcessor<P> {
pub fn new(processor: P, cache: QueryCache) -> Self {
Self { processor, cache }
}
pub fn with_default_cache(processor: P) -> Self {
Self::new(processor, QueryCache::default())
}
#[must_use]
pub fn cache(&self) -> &QueryCache {
&self.cache
}
#[must_use]
pub fn processor(&self) -> &P {
&self.processor
}
#[must_use]
pub fn stats(&self) -> CacheStats {
self.cache.stats()
}
pub fn clear_cache(&self) {
self.cache.clear();
}
}
#[cfg(test)]
mod tests {
use super::*;
#[cfg(feature = "gql")]
fn test_language() -> QueryLanguage {
QueryLanguage::Gql
}
#[cfg(not(feature = "gql"))]
fn test_language() -> QueryLanguage {
#[cfg(feature = "cypher")]
return QueryLanguage::Cypher;
#[cfg(feature = "sparql")]
return QueryLanguage::Sparql;
}
#[test]
fn test_cache_key_normalization() {
let key1 = CacheKey::new("MATCH (n) RETURN n", test_language());
let key2 = CacheKey::new("MATCH (n) RETURN n", test_language());
assert_eq!(key1.query(), key2.query());
}
#[test]
fn test_cache_basic_operations() {
let cache = QueryCache::new(10);
let key = CacheKey::new("MATCH (n) RETURN n", test_language());
use crate::query::plan::{LogicalOperator, LogicalPlan};
let plan = LogicalPlan::new(LogicalOperator::Empty);
assert!(cache.get_parsed(&key).is_none());
cache.put_parsed(key.clone(), plan.clone());
assert!(cache.get_parsed(&key).is_some());
let stats = cache.stats();
assert_eq!(stats.parsed_size, 1);
assert_eq!(stats.parsed_hits, 1);
assert_eq!(stats.parsed_misses, 1);
}
#[test]
fn test_cache_lru_eviction() {
let cache = QueryCache::new(4);
use crate::query::plan::{LogicalOperator, LogicalPlan};
for i in 0..3 {
let key = CacheKey::new(format!("QUERY {}", i), test_language());
cache.put_parsed(key, LogicalPlan::new(LogicalOperator::Empty));
}
let key0 = CacheKey::new("QUERY 0", test_language());
assert!(cache.get_parsed(&key0).is_none());
let key1 = CacheKey::new("QUERY 1", test_language());
let key2 = CacheKey::new("QUERY 2", test_language());
assert!(cache.get_parsed(&key1).is_some());
assert!(cache.get_parsed(&key2).is_some());
}
#[test]
fn test_cache_invalidation() {
let cache = QueryCache::new(10);
let key = CacheKey::new("MATCH (n) RETURN n", test_language());
use crate::query::plan::{LogicalOperator, LogicalPlan};
let plan = LogicalPlan::new(LogicalOperator::Empty);
cache.put_parsed(key.clone(), plan.clone());
cache.put_optimized(key.clone(), plan);
assert!(cache.get_parsed(&key).is_some());
assert!(cache.get_optimized(&key).is_some());
cache.invalidate(&key);
cache.reset_stats();
assert!(cache.get_parsed(&key).is_none());
assert!(cache.get_optimized(&key).is_none());
}
#[test]
fn test_cache_disabled() {
let cache = QueryCache::disabled();
let key = CacheKey::new("MATCH (n) RETURN n", test_language());
use crate::query::plan::{LogicalOperator, LogicalPlan};
let plan = LogicalPlan::new(LogicalOperator::Empty);
cache.put_parsed(key.clone(), plan);
assert!(cache.get_parsed(&key).is_none());
let stats = cache.stats();
assert_eq!(stats.parsed_size, 0);
}
#[test]
fn test_cache_stats() {
let cache = QueryCache::new(10);
use crate::query::plan::{LogicalOperator, LogicalPlan};
let key1 = CacheKey::new("QUERY 1", test_language());
let key2 = CacheKey::new("QUERY 2", test_language());
let plan = LogicalPlan::new(LogicalOperator::Empty);
cache.get_optimized(&key1);
cache.put_optimized(key1.clone(), plan);
cache.get_optimized(&key1);
cache.get_optimized(&key1);
cache.get_optimized(&key2);
let stats = cache.stats();
assert_eq!(stats.optimized_hits, 2);
assert_eq!(stats.optimized_misses, 2);
assert!((stats.optimized_hit_rate() - 0.5).abs() < 0.01);
}
}