use crate::graph::types::CachedResult;
use crate::utils::error::{Error, Result};
use ahash::AHasher;
use lru::LruCache;
use oxigraph::io::RdfFormat;
use oxigraph::model::{GraphName, NamedNode, NamedOrBlankNode, Quad, Term};
use oxigraph::sparql::{QueryResults, SparqlEvaluator};
use oxigraph::store::Store;
use std::collections::BTreeMap;
use std::fs::File;
use std::hash::{Hash, Hasher};
use std::io::{BufReader, Cursor};
use std::num::NonZeroUsize;
use std::path::Path;
use std::sync::{
atomic::{AtomicU64, Ordering},
Arc, Mutex,
};
const DEFAULT_PLAN_CACHE_SIZE: usize = 100;
const DEFAULT_RESULT_CACHE_SIZE: usize = 1000;
const INITIAL_EPOCH: u64 = 1;
const EPOCH_INCREMENT: u64 = 1;
pub struct Graph {
inner: Arc<Store>,
epoch: Arc<AtomicU64>,
plan_cache: Arc<Mutex<LruCache<u64, String>>>,
result_cache: Arc<Mutex<LruCache<(u64, u64), CachedResult>>>,
}
impl Graph {
pub fn new() -> Result<Self> {
let plan_cache_size = NonZeroUsize::new(DEFAULT_PLAN_CACHE_SIZE)
.ok_or_else(|| Error::new("Invalid cache size"))?;
let result_cache_size = NonZeroUsize::new(DEFAULT_RESULT_CACHE_SIZE)
.ok_or_else(|| Error::new("Invalid cache size"))?;
let store =
Store::new().map_err(|e| Error::new(&format!("Failed to create store: {}", e)))?;
Ok(Self {
inner: Arc::new(store),
epoch: Arc::new(AtomicU64::new(INITIAL_EPOCH)),
plan_cache: Arc::new(Mutex::new(LruCache::new(plan_cache_size))),
result_cache: Arc::new(Mutex::new(LruCache::new(result_cache_size))),
})
}
pub fn load_from_file<P: AsRef<Path>>(path: P) -> Result<Self> {
let graph = Self::new()?;
graph.load_path(path)?;
Ok(graph)
}
pub fn load_from_string(ttl: &str) -> Result<Self> {
let graph = Self::new()?;
graph
.inner
.load_from_reader(RdfFormat::Turtle, Cursor::new(ttl))
.map_err(|e| Error::new(&format!("Failed to load RDF from string: {}", e)))?;
graph.bump_epoch();
Ok(graph)
}
pub(crate) fn current_epoch(&self) -> u64 {
self.epoch.load(Ordering::Relaxed)
}
pub(crate) fn bump_epoch(&self) {
self.epoch.fetch_add(EPOCH_INCREMENT, Ordering::Relaxed);
}
pub(crate) fn inner(&self) -> &Store {
&self.inner
}
pub(crate) fn from_store(store: Arc<Store>) -> Result<Self> {
let plan_cache_size = NonZeroUsize::new(DEFAULT_PLAN_CACHE_SIZE)
.ok_or_else(|| Error::new("Invalid cache size"))?;
let result_cache_size = NonZeroUsize::new(DEFAULT_RESULT_CACHE_SIZE)
.ok_or_else(|| Error::new("Invalid cache size"))?;
Ok(Self {
inner: store,
epoch: Arc::new(AtomicU64::new(INITIAL_EPOCH)),
plan_cache: Arc::new(Mutex::new(LruCache::new(plan_cache_size))),
result_cache: Arc::new(Mutex::new(LruCache::new(result_cache_size))),
})
}
fn hash_query(&self, sparql: &str) -> u64 {
let mut hasher = AHasher::default();
sparql.hash(&mut hasher);
hasher.finish()
}
fn materialize_results(&self, results: QueryResults) -> Result<CachedResult> {
match results {
QueryResults::Boolean(b) => Ok(CachedResult::Boolean(b)),
QueryResults::Solutions(solutions) => {
let mut rows = Vec::new();
for solution in solutions {
let solution = solution
.map_err(|e| Error::new(&format!("SPARQL solution error: {}", e)))?;
let mut row = BTreeMap::new();
for (var, term) in solution.iter() {
let var_name = var.as_str().trim_start_matches('?').to_string();
row.insert(var_name, term.to_string());
}
rows.push(row);
}
Ok(CachedResult::Solutions(rows))
}
QueryResults::Graph(quads) => {
let mut triples = Vec::new();
for q in quads {
let quad = q.map_err(|e| Error::new(&format!("Quad error: {}", e)))?;
triples.push(quad.to_string());
}
Ok(CachedResult::Graph(triples))
}
}
}
pub fn insert_turtle(&self, turtle: &str) -> Result<()> {
self.inner
.load_from_reader(RdfFormat::Turtle, turtle.as_bytes())
.map_err(|e| Error::new(&format!("Failed to load Turtle: {}", e)))?;
self.bump_epoch();
Ok(())
}
pub fn insert_turtle_with_base(&self, turtle: &str, base_iri: &str) -> Result<()> {
let base_iri_trimmed = base_iri.trim();
let turtle_with_base = if turtle.trim_start().starts_with("BASE")
|| turtle.trim_start().starts_with("@base")
{
turtle.to_string()
} else {
format!("BASE <{}>\n{}", base_iri_trimmed, turtle)
};
self.inner
.load_from_reader(RdfFormat::Turtle, turtle_with_base.as_bytes())
.map_err(|e| Error::new(&format!("Failed to load Turtle with base IRI: {}", e)))?;
self.bump_epoch();
Ok(())
}
pub fn insert_turtle_in(&self, turtle: &str, graph_iri: &str) -> Result<()> {
let temp_store = Store::new()
.map_err(|e| Error::new(&format!("Failed to create temporary store: {}", e)))?;
temp_store
.load_from_reader(RdfFormat::Turtle, turtle.as_bytes())
.map_err(|e| Error::new(&format!("Failed to parse Turtle: {}", e)))?;
let graph_name = GraphName::NamedNode(
NamedNode::new(graph_iri)
.map_err(|e| Error::new(&format!("Invalid graph IRI: {}", e)))?,
);
let quads: Vec<Quad> = temp_store
.quads_for_pattern(None, None, None, None)
.collect::<std::result::Result<Vec<_>, _>>()?;
for quad in quads {
let named_quad = Quad {
subject: quad.subject.clone(),
predicate: quad.predicate.clone(),
object: quad.object.clone(),
graph_name: graph_name.clone(),
};
self.inner.insert(&named_quad).map_err(|e| {
Error::new(&format!("Failed to insert quad into named graph: {}", e))
})?;
}
self.bump_epoch();
Ok(())
}
pub fn insert_quad(&self, s: &str, p: &str, o: &str) -> Result<()> {
let s =
NamedNode::new(s).map_err(|e| Error::new(&format!("Invalid subject IRI: {}", e)))?;
let p =
NamedNode::new(p).map_err(|e| Error::new(&format!("Invalid predicate IRI: {}", e)))?;
let o = NamedNode::new(o).map_err(|e| Error::new(&format!("Invalid object IRI: {}", e)))?;
self.inner
.insert(&Quad::new(s, p, o, GraphName::DefaultGraph))
.map_err(|e| Error::new(&format!("Failed to insert quad: {}", e)))?;
self.bump_epoch();
Ok(())
}
pub fn insert_quad_in(&self, s: &str, p: &str, o: &str, graph_iri: &str) -> Result<()> {
let s =
NamedNode::new(s).map_err(|e| Error::new(&format!("Invalid subject IRI: {}", e)))?;
let p =
NamedNode::new(p).map_err(|e| Error::new(&format!("Invalid predicate IRI: {}", e)))?;
let o = NamedNode::new(o).map_err(|e| Error::new(&format!("Invalid object IRI: {}", e)))?;
let g = GraphName::NamedNode(
NamedNode::new(graph_iri)
.map_err(|e| Error::new(&format!("Invalid graph IRI: {}", e)))?,
);
self.inner
.insert(&Quad::new(s, p, o, g))
.map_err(|e| Error::new(&format!("Failed to insert quad into named graph: {}", e)))?;
self.bump_epoch();
Ok(())
}
pub fn insert_quad_object(&self, quad: &Quad) -> Result<()> {
self.inner
.insert(quad)
.map_err(|e| Error::new(&format!("Failed to insert quad: {}", e)))?;
self.bump_epoch();
Ok(())
}
pub fn remove_quad(&self, s: &str, p: &str, o: &str) -> Result<()> {
let s =
NamedNode::new(s).map_err(|e| Error::new(&format!("Invalid subject IRI: {}", e)))?;
let p =
NamedNode::new(p).map_err(|e| Error::new(&format!("Invalid predicate IRI: {}", e)))?;
let o = NamedNode::new(o).map_err(|e| Error::new(&format!("Invalid object IRI: {}", e)))?;
self.inner
.remove(&Quad::new(s, p, o, GraphName::DefaultGraph))
.map_err(|e| Error::new(&format!("Failed to remove quad: {}", e)))?;
self.bump_epoch();
Ok(())
}
pub fn remove_quad_from(&self, s: &str, p: &str, o: &str, graph_iri: &str) -> Result<()> {
let s =
NamedNode::new(s).map_err(|e| Error::new(&format!("Invalid subject IRI: {}", e)))?;
let p =
NamedNode::new(p).map_err(|e| Error::new(&format!("Invalid predicate IRI: {}", e)))?;
let o = NamedNode::new(o).map_err(|e| Error::new(&format!("Invalid object IRI: {}", e)))?;
let g = GraphName::NamedNode(
NamedNode::new(graph_iri)
.map_err(|e| Error::new(&format!("Invalid graph IRI: {}", e)))?,
);
self.inner
.remove(&Quad::new(s, p, o, g))
.map_err(|e| Error::new(&format!("Failed to remove quad from named graph: {}", e)))?;
self.bump_epoch();
Ok(())
}
pub fn remove_quad_object(&self, quad: &Quad) -> Result<()> {
self.inner
.remove(quad)
.map_err(|e| Error::new(&format!("Failed to remove quad: {}", e)))?;
self.bump_epoch();
Ok(())
}
pub fn remove_for_pattern(
&self, s: Option<&NamedOrBlankNode>, p: Option<&NamedNode>, o: Option<&Term>,
g: Option<&GraphName>,
) -> Result<usize> {
let quads: Vec<Quad> = self
.inner
.quads_for_pattern(
s.map(|x| x.as_ref()),
p.map(|x| x.as_ref()),
o.map(|x| x.as_ref()),
g.map(|x| x.as_ref()),
)
.collect::<std::result::Result<Vec<_>, _>>()
.map_err(|e| Error::new(&format!("Failed to collect quads: {}", e)))?;
let count = quads.len();
for quad in &quads {
self.inner
.remove(quad)
.map_err(|e| Error::new(&format!("Failed to remove quad: {}", e)))?;
}
self.bump_epoch();
Ok(count)
}
pub fn quads(&self) -> impl Iterator<Item = Result<Quad>> + '_ {
self.inner
.quads_for_pattern(None, None, None, None)
.map(|r| r.map_err(|e| Error::new(&format!("Oxigraph error: {}", e))))
}
pub fn load_path<P: AsRef<Path>>(&self, path: P) -> Result<()> {
let path = path.as_ref();
let ext = path
.extension()
.and_then(|e| e.to_str())
.map(|s| s.to_ascii_lowercase())
.unwrap_or_default();
let fmt = match ext.as_str() {
"ttl" | "turtle" => RdfFormat::Turtle,
"nt" | "ntriples" => RdfFormat::NTriples,
"rdf" | "xml" => RdfFormat::RdfXml,
"trig" => RdfFormat::TriG,
"nq" | "nquads" => RdfFormat::NQuads,
other => return Err(Error::new(&format!("unsupported RDF format: {}", other))),
};
let file = File::open(path)?;
let reader = BufReader::new(file);
self.inner
.load_from_reader(fmt, reader)
.map_err(|e| Error::new(&format!("Failed to load RDF from file: {}", e)))?;
self.bump_epoch();
Ok(())
}
pub fn query_cached(&self, sparql: &str) -> Result<CachedResult> {
let query_hash = self.hash_query(sparql);
let epoch = self.current_epoch();
let cache_key = (query_hash, epoch);
{
let mut cache_guard = self
.result_cache
.lock()
.map_err(|_| Error::new("Cache lock poisoned"))?;
if let Some(cached) = cache_guard.get(&cache_key).cloned() {
return Ok(cached);
}
}
let final_epoch = self.current_epoch();
let final_cache_key = if final_epoch != epoch {
let new_cache_key = (query_hash, final_epoch);
{
let mut cache_guard2 = self
.result_cache
.lock()
.map_err(|_| Error::new("Cache lock poisoned"))?;
if let Some(cached) = cache_guard2.get(&new_cache_key).cloned() {
return Ok(cached);
}
}
new_cache_key
} else {
cache_key
};
let query_str = {
let mut cache = self
.plan_cache
.lock()
.map_err(|_| Error::new("Cache lock poisoned"))?;
if let Some(q) = cache.get(&query_hash).cloned() {
q
} else {
let q = sparql.to_string();
cache.put(query_hash, q.clone());
q
}
};
let results = SparqlEvaluator::new()
.parse_query(&query_str)
.map_err(|e| Error::new(&format!("SPARQL parse error: {}", e)))?
.on_store(&self.inner)
.execute()
.map_err(|e| Error::new(&format!("SPARQL execution error: {}", e)))?;
let cached = self.materialize_results(results)?;
self.result_cache
.lock()
.map_err(|_| Error::new("Cache lock poisoned"))?
.put(final_cache_key, cached.clone());
Ok(cached)
}
pub fn query<'a>(&'a self, sparql: &str) -> Result<QueryResults<'a>> {
let cached = self.query_cached(sparql)?;
match cached {
CachedResult::Boolean(b) => Ok(QueryResults::Boolean(b)),
CachedResult::Solutions(_) | CachedResult::Graph(_) => {
Ok(SparqlEvaluator::new()
.parse_query(sparql)
.map_err(|e| Error::new(&format!("SPARQL parse error: {}", e)))?
.on_store(&self.inner)
.execute()
.map_err(|e| Error::new(&format!("SPARQL execution error: {}", e)))?)
}
}
}
pub fn query_with_prolog<'a>(
&'a self, sparql: &str, prefixes: &BTreeMap<String, String>, base: Option<&str>,
) -> Result<QueryResults<'a>> {
let head = crate::graph::build_prolog(prefixes, base);
let q = if head.is_empty() {
sparql.into()
} else {
format!("{head}\n{sparql}")
};
self.query(&q)
}
pub fn query_prepared<'a>(&'a self, q: &str) -> Result<QueryResults<'a>> {
SparqlEvaluator::new()
.parse_query(q)
.map_err(|e| Error::new(&format!("SPARQL parse error: {}", e)))?
.on_store(&self.inner)
.execute()
.map_err(|e| Error::new(&format!("SPARQL execution error: {}", e)))
}
pub fn quads_for_pattern(
&self, s: Option<&NamedOrBlankNode>, p: Option<&NamedNode>, o: Option<&Term>,
g: Option<&GraphName>,
) -> Result<Vec<Quad>> {
self.inner
.quads_for_pattern(
s.map(|x| x.as_ref()),
p.map(|x| x.as_ref()),
o.map(|x| x.as_ref()),
g.map(|x| x.as_ref()),
)
.map(|r| r.map_err(|e| Error::new(&format!("Quad error: {}", e))))
.collect::<Result<Vec<_>>>()
}
pub fn clear(&self) -> Result<()> {
self.inner
.clear()
.map_err(|e| Error::new(&format!("Failed to clear graph: {}", e)))?;
self.bump_epoch();
Ok(())
}
pub fn len(&self) -> usize {
self.len_result().unwrap_or(0)
}
pub fn len_result(&self) -> Result<usize> {
self.inner.len().map_err(Into::into)
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
}
impl Clone for Graph {
fn clone(&self) -> Self {
Self {
inner: Arc::clone(&self.inner),
epoch: Arc::clone(&self.epoch),
plan_cache: Arc::clone(&self.plan_cache),
result_cache: Arc::clone(&self.result_cache),
}
}
}
pub fn build_prolog(prefixes: &BTreeMap<String, String>, base: Option<&str>) -> String {
let mut s = String::new();
if let Some(b) = base {
let _ = std::fmt::Write::write_fmt(&mut s, format_args!("BASE <{}>\n", b));
}
for (pfx, iri) in prefixes {
let _ = std::fmt::Write::write_fmt(&mut s, format_args!("PREFIX {}: <{}>\n", pfx, iri));
}
s
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_graph_new() {
let graph = Graph::new().unwrap();
assert!(graph.is_empty());
assert_eq!(graph.len(), 0);
}
#[test]
fn test_graph_insert_turtle() {
let graph = Graph::new().unwrap();
graph
.insert_turtle(
r#"
@prefix ex: <http://example.org/> .
ex:alice a ex:Person .
"#,
)
.unwrap();
assert!(!graph.is_empty());
assert!(!graph.is_empty());
}
#[test]
fn test_graph_query_cached() {
let graph = Graph::new().unwrap();
graph
.insert_turtle(
r#"
@prefix ex: <http://example.org/> .
ex:alice a ex:Person ;
ex:name "Alice" .
"#,
)
.unwrap();
let result = graph
.query_cached("SELECT ?name WHERE { ?s <http://example.org/name> ?name }")
.unwrap();
match result {
CachedResult::Solutions(rows) => {
assert!(!rows.is_empty());
assert!(rows[0].contains_key("name"));
}
_ => panic!("Expected solutions"),
}
}
#[test]
fn test_build_prolog_with_prefixes() {
let mut prefixes = BTreeMap::new();
prefixes.insert("ex".to_string(), "http://example.org/".to_string());
prefixes.insert(
"rdf".to_string(),
"http://www.w3.org/1999/02/22-rdf-syntax-ns#".to_string(),
);
let prolog = build_prolog(&prefixes, None);
assert!(prolog.contains("PREFIX ex: <http://example.org/>"));
assert!(prolog.contains("PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>"));
}
#[test]
fn test_build_prolog_with_base() {
let prefixes = BTreeMap::new();
let prolog = build_prolog(&prefixes, Some("http://example.org/"));
assert!(prolog.contains("BASE <http://example.org/>"));
}
#[test]
fn test_sparql_variables_stripped_of_question_mark() {
let graph = Graph::new().unwrap();
graph
.insert_turtle(
r#"
@prefix ex: <http://example.org/> .
ex:john ex:name "John" ;
ex:age "30" .
"#,
)
.unwrap();
let query = r#"
PREFIX ex: <http://example.org/>
SELECT ?name ?age
WHERE {
?person ex:name ?name ;
ex:age ?age .
}
"#;
let result = graph.query_cached(query).unwrap();
if let CachedResult::Solutions(rows) = result {
assert!(!rows.is_empty());
let row = &rows[0];
assert!(
row.contains_key("name"),
"Expected 'name' key without ? prefix. Got keys: {:?}",
row.keys().collect::<Vec<_>>()
);
assert!(
row.contains_key("age"),
"Expected 'age' key without ? prefix. Got keys: {:?}",
row.keys().collect::<Vec<_>>()
);
assert!(
!row.contains_key("?name"),
"Should not have '?name' key with ? prefix"
);
assert!(
!row.contains_key("?age"),
"Should not have '?age' key with ? prefix"
);
} else {
panic!("Expected Solutions result type");
}
}
}