use crate::utils::error::{Error, Result};
use lru::LruCache;
use oxigraph::sparql::QueryResults;
use oxigraph::store::Store;
use std::collections::HashMap;
use std::num::NonZeroUsize;
use std::sync::{Arc, Mutex};
type PredicateIndex = Arc<Mutex<HashMap<String, Vec<(String, String)>>>>;
#[derive(Debug, Clone)]
pub struct QueryCache {
cache: Arc<Mutex<LruCache<String, CachedResult>>>,
predicate_index: PredicateIndex,
version: Arc<Mutex<u64>>,
}
#[derive(Debug, Clone)]
struct CachedResult {
data: String,
version: u64,
}
impl QueryCache {
pub fn new(capacity: usize) -> Self {
Self {
cache: Arc::new(Mutex::new(LruCache::new(
NonZeroUsize::new(capacity).unwrap_or(std::num::NonZeroUsize::MIN),
))),
predicate_index: Arc::new(Mutex::new(HashMap::new())),
version: Arc::new(Mutex::new(0)),
}
}
pub fn execute_cached(&self, store: &Store, query_str: &str) -> Result<String> {
let current_version = *self.version.lock().unwrap_or_else(|e| e.into_inner());
{
let mut cache = self.cache.lock().unwrap_or_else(|e| e.into_inner());
if let Some(cached) = cache.get(query_str) {
if cached.version == current_version {
return Ok(cached.data.clone());
}
}
}
let results = self.execute_query(store, query_str)?;
{
let mut cache = self.cache.lock().unwrap_or_else(|e| e.into_inner());
cache.put(
query_str.to_string(),
CachedResult {
data: results.clone(),
version: current_version,
},
);
}
Ok(results)
}
#[allow(deprecated)]
fn execute_query(&self, store: &Store, query_str: &str) -> Result<String> {
match store.query(query_str) {
Ok(QueryResults::Solutions(solutions)) => {
let mut results = Vec::new();
for solution in solutions {
let solution = solution.map_err(|e| {
Error::with_context("Query execution failed", &e.to_string())
})?;
let mut row = HashMap::new();
for (var, term) in solution.iter() {
row.insert(var.as_str().to_string(), term.to_string());
}
results.push(row);
}
serde_json::to_string(&results)
.map_err(|e| Error::with_context("Failed to serialize results", &e.to_string()))
}
Ok(QueryResults::Boolean(b)) => Ok(serde_json::json!({ "boolean": b }).to_string()),
Ok(QueryResults::Graph(_)) => {
Err(Error::new("Graph query results not yet supported in cache"))
}
Err(e) => Err(Error::with_context(
"Query execution failed",
&e.to_string(),
)),
}
}
pub fn invalidate(&self) {
let mut version = self.version.lock().unwrap_or_else(|e| e.into_inner());
*version += 1;
}
pub fn clear(&self) {
let mut cache = self.cache.lock().unwrap_or_else(|e| e.into_inner());
cache.clear();
let mut index = self
.predicate_index
.lock()
.unwrap_or_else(|e| e.into_inner());
index.clear();
}
pub fn build_predicate_index(&self, store: &Store, predicates: &[&str]) -> Result<()> {
use crate::rdf::query_builder::{Iri, SparqlQueryBuilder, Variable};
let mut index = self
.predicate_index
.lock()
.unwrap_or_else(|e| e.into_inner());
for predicate in predicates {
let predicate_iri = Iri::new(*predicate)
.map_err(|e| Error::with_context("Invalid predicate IRI", &e.to_string()))?;
let query = SparqlQueryBuilder::select()
.var(Variable::new("s").map_err(|e| {
Error::with_context("Failed to create variable 's'", &e.to_string())
})?)
.var(Variable::new("o").map_err(|e| {
Error::with_context("Failed to create variable 'o'", &e.to_string())
})?)
.where_pattern(format!("?s <{}> ?o", predicate_iri.as_str()))
.build()
.map_err(|e| {
Error::with_context("Failed to build predicate index query", &e.to_string())
})?;
let results = self.execute_query(store, &query)?;
let parsed: Vec<HashMap<String, String>> =
serde_json::from_str(&results).map_err(|e| {
Error::with_context("Failed to parse index results", &e.to_string())
})?;
let entries: Vec<(String, String)> = parsed
.into_iter()
.filter_map(|mut row| {
let s = row.remove("s")?;
let o = row.remove("o")?;
Some((s, o))
})
.collect();
index.insert(predicate.to_string(), entries);
}
Ok(())
}
pub fn query_indexed(&self, predicate: &str) -> Option<Vec<(String, String)>> {
let index = self
.predicate_index
.lock()
.unwrap_or_else(|e| e.into_inner());
index.get(predicate).cloned()
}
pub fn stats(&self) -> CacheStats {
let cache = self.cache.lock().unwrap_or_else(|e| e.into_inner());
let index = self
.predicate_index
.lock()
.unwrap_or_else(|e| e.into_inner());
CacheStats {
cache_size: cache.len(),
cache_capacity: cache.cap().get(),
indexed_predicates: index.len(),
cache_version: *self.version.lock().unwrap_or_else(|e| e.into_inner()),
}
}
}
#[derive(Debug, Clone)]
pub struct CacheStats {
pub cache_size: usize,
pub cache_capacity: usize,
pub indexed_predicates: usize,
pub cache_version: u64,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_query_cache_creation() {
let cache = QueryCache::new(100);
let stats = cache.stats();
assert_eq!(stats.cache_size, 0);
assert_eq!(stats.cache_capacity, 100);
assert_eq!(stats.indexed_predicates, 0);
assert_eq!(stats.cache_version, 0);
}
#[test]
fn test_cache_invalidation() {
let cache = QueryCache::new(100);
assert_eq!(cache.stats().cache_version, 0);
cache.invalidate();
assert_eq!(cache.stats().cache_version, 1);
cache.invalidate();
assert_eq!(cache.stats().cache_version, 2);
}
#[test]
fn test_cache_clear() {
let cache = QueryCache::new(100);
cache.invalidate();
cache.clear();
let stats = cache.stats();
assert_eq!(stats.cache_size, 0);
assert_eq!(stats.indexed_predicates, 0);
}
}