use std::collections::HashMap;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
pub use crate::view::incremental::TripleDelta;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct CoreCacheKey {
pub dataset_id: String,
pub query_fingerprint: u64,
}
impl CoreCacheKey {
pub fn new(dataset_id: &str, query: &str) -> Self {
Self {
dataset_id: dataset_id.to_owned(),
query_fingerprint: Self::fingerprint(query),
}
}
fn fingerprint(s: &str) -> u64 {
const FNV_OFFSET: u64 = 14_695_981_039_346_656_037;
const FNV_PRIME: u64 = 1_099_511_628_211;
let mut hash = FNV_OFFSET;
for byte in s.bytes() {
hash ^= byte as u64;
hash = hash.wrapping_mul(FNV_PRIME);
}
hash
}
}
#[derive(Debug, Clone)]
pub struct CoreCacheEntry {
pub key: CoreCacheKey,
pub result_rows: Vec<HashMap<String, String>>,
pub accessed_predicates: Vec<String>,
pub created_at: Instant,
pub last_accessed: Instant,
pub expires_at: Instant,
pub hit_count: u64,
}
impl CoreCacheEntry {
pub fn is_expired(&self) -> bool {
Instant::now() >= self.expires_at
}
fn touch(&mut self) {
self.hit_count += 1;
self.last_accessed = Instant::now();
}
}
struct LruList {
order: Vec<CoreCacheKey>,
}
impl LruList {
fn new(capacity: usize) -> Self {
Self {
order: Vec::with_capacity(capacity),
}
}
fn touch(&mut self, key: &CoreCacheKey) {
if let Some(pos) = self.order.iter().position(|k| k == key) {
self.order.remove(pos);
}
self.order.push(key.clone());
}
fn remove(&mut self, key: &CoreCacheKey) {
self.order.retain(|k| k != key);
}
fn pop_lru(&mut self) -> Option<CoreCacheKey> {
if self.order.is_empty() {
None
} else {
Some(self.order.remove(0))
}
}
fn len(&self) -> usize {
self.order.len()
}
}
struct CacheInner {
entries: HashMap<CoreCacheKey, CoreCacheEntry>,
lru: LruList,
capacity: usize,
}
impl CacheInner {
fn new(capacity: usize) -> Self {
Self {
entries: HashMap::with_capacity(capacity),
lru: LruList::new(capacity),
capacity,
}
}
fn evict_to_capacity(&mut self) -> usize {
let mut evicted = 0;
while self.entries.len() >= self.capacity {
if let Some(lru_key) = self.lru.pop_lru() {
self.entries.remove(&lru_key);
evicted += 1;
} else {
break;
}
}
evicted
}
fn purge_expired(&mut self) -> usize {
let expired: Vec<CoreCacheKey> = self
.entries
.iter()
.filter(|(_, e)| e.is_expired())
.map(|(k, _)| k.clone())
.collect();
let count = expired.len();
for key in &expired {
self.entries.remove(key);
self.lru.remove(key);
}
count
}
}
pub struct CoreResultCache {
inner: Arc<Mutex<CacheInner>>,
default_ttl: Duration,
hits: Arc<AtomicU64>,
misses: Arc<AtomicU64>,
}
impl CoreResultCache {
pub fn new(capacity: usize, ttl: Duration) -> Self {
Self {
inner: Arc::new(Mutex::new(CacheInner::new(capacity.max(1)))),
default_ttl: ttl,
hits: Arc::new(AtomicU64::new(0)),
misses: Arc::new(AtomicU64::new(0)),
}
}
pub fn get(&self, key: &CoreCacheKey) -> Option<Vec<HashMap<String, String>>> {
let mut inner = self.inner.lock().expect("cache lock poisoned");
if let Some(entry) = inner.entries.get_mut(key) {
if entry.is_expired() {
let key_clone = key.clone();
inner.entries.remove(&key_clone);
inner.lru.remove(&key_clone);
self.misses.fetch_add(1, Ordering::Relaxed);
return None;
}
entry.touch();
let result = entry.result_rows.clone();
inner.lru.touch(key);
self.hits.fetch_add(1, Ordering::Relaxed);
Some(result)
} else {
self.misses.fetch_add(1, Ordering::Relaxed);
None
}
}
pub fn put(
&self,
key: CoreCacheKey,
rows: Vec<HashMap<String, String>>,
predicates: Vec<String>,
) {
self.put_with_ttl(key, rows, predicates, self.default_ttl);
}
pub fn put_with_ttl(
&self,
key: CoreCacheKey,
rows: Vec<HashMap<String, String>>,
predicates: Vec<String>,
ttl: Duration,
) {
let now = Instant::now();
let entry = CoreCacheEntry {
key: key.clone(),
result_rows: rows,
accessed_predicates: predicates,
created_at: now,
last_accessed: now,
expires_at: now + ttl,
hit_count: 0,
};
let mut inner = self.inner.lock().expect("cache lock poisoned");
inner.purge_expired();
inner.evict_to_capacity();
if inner.entries.contains_key(&key) {
inner.lru.remove(&key);
}
inner.lru.touch(&key);
inner.entries.insert(key, entry);
}
pub fn invalidate_dataset(&self, dataset_id: &str) -> usize {
let mut inner = self.inner.lock().expect("cache lock poisoned");
let to_remove: Vec<CoreCacheKey> = inner
.entries
.keys()
.filter(|k| k.dataset_id == dataset_id)
.cloned()
.collect();
let count = to_remove.len();
for key in &to_remove {
inner.entries.remove(key);
inner.lru.remove(key);
}
count
}
pub fn invalidate_predicate(&self, dataset_id: &str, predicate: &str) -> usize {
let mut inner = self.inner.lock().expect("cache lock poisoned");
let to_remove: Vec<CoreCacheKey> = inner
.entries
.iter()
.filter(|(k, e)| {
k.dataset_id == dataset_id
&& (e.accessed_predicates.is_empty()
|| e.accessed_predicates.iter().any(|p| p == predicate))
})
.map(|(k, _)| k.clone())
.collect();
let count = to_remove.len();
for key in &to_remove {
inner.entries.remove(key);
inner.lru.remove(key);
}
count
}
pub fn invalidate_on_delta(&self, dataset_id: &str, deltas: &[TripleDelta]) -> usize {
if deltas.is_empty() {
return 0;
}
let changed_predicates: std::collections::HashSet<&str> =
deltas.iter().map(|d| d.predicate()).collect();
let mut inner = self.inner.lock().expect("cache lock poisoned");
let to_remove: Vec<CoreCacheKey> = inner
.entries
.iter()
.filter(|(k, e)| {
if k.dataset_id != dataset_id {
return false;
}
if e.accessed_predicates.is_empty() {
return true; }
e.accessed_predicates
.iter()
.any(|p| changed_predicates.contains(p.as_str()))
})
.map(|(k, _)| k.clone())
.collect();
let count = to_remove.len();
for key in &to_remove {
inner.entries.remove(key);
inner.lru.remove(key);
}
count
}
pub fn hit_rate(&self) -> f64 {
let hits = self.hits.load(Ordering::Relaxed);
let misses = self.misses.load(Ordering::Relaxed);
let total = hits + misses;
if total == 0 {
0.0
} else {
hits as f64 / total as f64
}
}
pub fn size(&self) -> usize {
self.inner.lock().expect("cache lock poisoned").lru.len()
}
pub fn clear(&self) {
let mut inner = self.inner.lock().expect("cache lock poisoned");
inner.entries.clear();
inner.lru.order.clear();
}
pub fn hit_count(&self) -> u64 {
self.hits.load(Ordering::Relaxed)
}
pub fn miss_count(&self) -> u64 {
self.misses.load(Ordering::Relaxed)
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::thread;
fn make_key(dataset: &str, query: &str) -> CoreCacheKey {
CoreCacheKey::new(dataset, query)
}
fn make_rows(count: usize) -> Vec<HashMap<String, String>> {
(0..count)
.map(|i| {
let mut m = HashMap::new();
m.insert("s".to_string(), format!("subject{}", i));
m.insert("o".to_string(), format!("object{}", i));
m
})
.collect()
}
#[test]
fn test_cache_key_same_input_same_fingerprint() {
let k1 = make_key("ds1", "SELECT * WHERE { ?s ?p ?o }");
let k2 = make_key("ds1", "SELECT * WHERE { ?s ?p ?o }");
assert_eq!(k1, k2);
assert_eq!(k1.query_fingerprint, k2.query_fingerprint);
}
#[test]
fn test_cache_key_different_datasets_different_key() {
let k1 = make_key("ds1", "SELECT * WHERE { ?s ?p ?o }");
let k2 = make_key("ds2", "SELECT * WHERE { ?s ?p ?o }");
assert_ne!(k1, k2);
}
#[test]
fn test_cache_key_different_queries_different_fingerprint() {
let k1 = make_key("ds", "SELECT ?s WHERE { ?s ?p ?o }");
let k2 = make_key("ds", "SELECT ?o WHERE { ?s ?p ?o }");
assert_ne!(k1.query_fingerprint, k2.query_fingerprint);
}
#[test]
fn test_cache_key_hash_stable() {
let k = make_key("myds", "ASK { <s> <p> <o> }");
let k2 = make_key("myds", "ASK { <s> <p> <o> }");
assert_eq!(k.query_fingerprint, k2.query_fingerprint);
}
#[test]
fn test_cache_entry_is_not_expired_initially() {
let now = Instant::now();
let entry = CoreCacheEntry {
key: make_key("ds", "q"),
result_rows: vec![],
accessed_predicates: vec![],
created_at: now,
last_accessed: now,
expires_at: now + Duration::from_secs(60),
hit_count: 0,
};
assert!(!entry.is_expired());
}
#[test]
fn test_cache_entry_is_expired_past_deadline() {
let past = Instant::now() - Duration::from_secs(1);
let entry = CoreCacheEntry {
key: make_key("ds", "q"),
result_rows: vec![],
accessed_predicates: vec![],
created_at: past,
last_accessed: past,
expires_at: past,
hit_count: 0,
};
assert!(entry.is_expired());
}
#[test]
fn test_cache_miss_on_empty() {
let cache = CoreResultCache::new(100, Duration::from_secs(60));
let key = make_key("ds", "SELECT * WHERE { ?s ?p ?o }");
assert!(cache.get(&key).is_none());
assert_eq!(cache.miss_count(), 1);
}
#[test]
fn test_cache_hit_after_put() {
let cache = CoreResultCache::new(100, Duration::from_secs(60));
let key = make_key("ds", "SELECT * WHERE { ?s ?p ?o }");
let rows = make_rows(3);
cache.put(key.clone(), rows.clone(), vec!["http://p".to_string()]);
let result = cache.get(&key).expect("cache hit expected");
assert_eq!(result.len(), 3);
assert_eq!(cache.hit_count(), 1);
assert_eq!(cache.miss_count(), 0);
}
#[test]
fn test_cache_size_increases_on_put() {
let cache = CoreResultCache::new(100, Duration::from_secs(60));
assert_eq!(cache.size(), 0);
cache.put(make_key("ds", "q1"), make_rows(1), vec![]);
assert_eq!(cache.size(), 1);
cache.put(make_key("ds", "q2"), make_rows(2), vec![]);
assert_eq!(cache.size(), 2);
}
#[test]
fn test_cache_hit_rate_pure_hits() {
let cache = CoreResultCache::new(100, Duration::from_secs(60));
let key = make_key("ds", "q");
cache.put(key.clone(), make_rows(1), vec![]);
cache.get(&key);
cache.get(&key);
assert!((cache.hit_rate() - 1.0).abs() < f64::EPSILON);
}
#[test]
fn test_cache_hit_rate_mixed() {
let cache = CoreResultCache::new(100, Duration::from_secs(60));
let key = make_key("ds", "q");
cache.put(key.clone(), make_rows(1), vec![]);
cache.get(&key); cache.get(&make_key("ds", "other")); assert!((cache.hit_rate() - 0.5).abs() < f64::EPSILON);
}
#[test]
fn test_cache_ttl_expiration() {
let cache = CoreResultCache::new(100, Duration::from_millis(50));
let key = make_key("ds", "q");
cache.put(key.clone(), make_rows(1), vec![]);
assert!(cache.get(&key).is_some());
thread::sleep(Duration::from_millis(100));
assert!(cache.get(&key).is_none());
}
#[test]
fn test_cache_put_with_custom_ttl_expires() {
let cache = CoreResultCache::new(100, Duration::from_secs(300));
let key = make_key("ds", "custom_ttl");
cache.put_with_ttl(key.clone(), make_rows(1), vec![], Duration::from_millis(30));
assert!(cache.get(&key).is_some());
thread::sleep(Duration::from_millis(60));
assert!(cache.get(&key).is_none());
}
#[test]
fn test_cache_lru_eviction() {
let cache = CoreResultCache::new(3, Duration::from_secs(60));
cache.put(make_key("ds", "q1"), make_rows(1), vec![]);
cache.put(make_key("ds", "q2"), make_rows(1), vec![]);
cache.put(make_key("ds", "q3"), make_rows(1), vec![]);
cache.get(&make_key("ds", "q1"));
cache.put(make_key("ds", "q4"), make_rows(1), vec![]);
assert!(cache.get(&make_key("ds", "q1")).is_some());
assert!(cache.get(&make_key("ds", "q2")).is_none()); assert!(cache.get(&make_key("ds", "q3")).is_some());
assert!(cache.get(&make_key("ds", "q4")).is_some());
}
#[test]
fn test_cache_clear() {
let cache = CoreResultCache::new(100, Duration::from_secs(60));
cache.put(make_key("ds", "q1"), make_rows(1), vec![]);
cache.put(make_key("ds", "q2"), make_rows(1), vec![]);
assert_eq!(cache.size(), 2);
cache.clear();
assert_eq!(cache.size(), 0);
}
#[test]
fn test_invalidate_dataset() {
let cache = CoreResultCache::new(100, Duration::from_secs(60));
cache.put(make_key("dsA", "q1"), make_rows(1), vec!["p1".to_string()]);
cache.put(make_key("dsA", "q2"), make_rows(1), vec!["p2".to_string()]);
cache.put(make_key("dsB", "q3"), make_rows(1), vec!["p1".to_string()]);
let removed = cache.invalidate_dataset("dsA");
assert_eq!(removed, 2);
assert!(cache.get(&make_key("dsA", "q1")).is_none());
assert!(cache.get(&make_key("dsA", "q2")).is_none());
assert!(cache.get(&make_key("dsB", "q3")).is_some());
}
#[test]
fn test_invalidate_predicate_specific() {
let cache = CoreResultCache::new(100, Duration::from_secs(60));
cache.put(
make_key("ds", "q1"),
make_rows(1),
vec!["http://p/age".to_string()],
);
cache.put(
make_key("ds", "q2"),
make_rows(1),
vec!["http://p/name".to_string()],
);
cache.put(
make_key("ds", "q3"),
make_rows(1),
vec!["http://p/age".to_string(), "http://p/name".to_string()],
);
let removed = cache.invalidate_predicate("ds", "http://p/age");
assert_eq!(removed, 2);
assert!(cache.get(&make_key("ds", "q1")).is_none());
assert!(cache.get(&make_key("ds", "q2")).is_some());
assert!(cache.get(&make_key("ds", "q3")).is_none());
}
#[test]
fn test_invalidate_predicate_wildcard_entry() {
let cache = CoreResultCache::new(100, Duration::from_secs(60));
cache.put(make_key("ds", "q_wildcard"), make_rows(1), vec![]);
let removed = cache.invalidate_predicate("ds", "http://p/anything");
assert_eq!(removed, 1);
assert!(cache.get(&make_key("ds", "q_wildcard")).is_none());
}
#[test]
fn test_invalidate_on_delta_affects_matching_entries() {
let cache = CoreResultCache::new(100, Duration::from_secs(60));
cache.put(
make_key("ds", "q_age"),
make_rows(1),
vec!["http://p/age".to_string()],
);
cache.put(
make_key("ds", "q_name"),
make_rows(1),
vec!["http://p/name".to_string()],
);
let deltas = vec![TripleDelta::Insert(
"s".into(),
"http://p/age".into(),
"30".into(),
)];
let removed = cache.invalidate_on_delta("ds", &deltas);
assert_eq!(removed, 1);
assert!(cache.get(&make_key("ds", "q_age")).is_none());
assert!(cache.get(&make_key("ds", "q_name")).is_some()); }
#[test]
fn test_invalidate_on_delta_wildcard_entry() {
let cache = CoreResultCache::new(100, Duration::from_secs(60));
cache.put(make_key("ds", "q_all"), make_rows(1), vec![]);
let deltas = vec![TripleDelta::Delete(
"s".into(),
"http://p/whatever".into(),
"o".into(),
)];
let removed = cache.invalidate_on_delta("ds", &deltas);
assert_eq!(removed, 1);
}
#[test]
fn test_invalidate_on_delta_empty_deltas_removes_nothing() {
let cache = CoreResultCache::new(100, Duration::from_secs(60));
cache.put(make_key("ds", "q1"), make_rows(1), vec!["p".to_string()]);
let removed = cache.invalidate_on_delta("ds", &[]);
assert_eq!(removed, 0);
assert!(cache.get(&make_key("ds", "q1")).is_some());
}
#[test]
fn test_invalidate_on_delta_different_dataset_unaffected() {
let cache = CoreResultCache::new(100, Duration::from_secs(60));
cache.put(
make_key("dsA", "q1"),
make_rows(1),
vec!["http://p/age".to_string()],
);
cache.put(
make_key("dsB", "q2"),
make_rows(1),
vec!["http://p/age".to_string()],
);
let deltas = vec![TripleDelta::Insert(
"s".into(),
"http://p/age".into(),
"5".into(),
)];
let removed = cache.invalidate_on_delta("dsA", &deltas);
assert_eq!(removed, 1);
assert!(cache.get(&make_key("dsB", "q2")).is_some());
}
#[test]
fn test_concurrent_put_and_get() {
let cache = Arc::new(CoreResultCache::new(200, Duration::from_secs(60)));
let mut handles = vec![];
for i in 0..8 {
let c = Arc::clone(&cache);
handles.push(thread::spawn(move || {
for j in 0..25 {
let key = make_key("ds", &format!("query_{}_{}", i, j));
c.put(key.clone(), make_rows(2), vec![]);
let _ = c.get(&key);
}
}));
}
for h in handles {
h.join().expect("thread panicked");
}
assert!(cache.size() <= 200);
}
#[test]
fn test_put_overwrites_existing_key() {
let cache = CoreResultCache::new(100, Duration::from_secs(60));
let key = make_key("ds", "q");
cache.put(key.clone(), make_rows(1), vec![]);
cache.put(key.clone(), make_rows(5), vec![]);
let result = cache.get(&key).expect("hit expected");
assert_eq!(result.len(), 5);
assert_eq!(cache.size(), 1);
}
}