use std::sync::Arc;
use grafeo_common::utils::error::Result;
use grafeo_core::graph::rdf::RdfStore;
use super::GrafeoDB;
impl GrafeoDB {
#[cfg(feature = "sparql")]
pub fn execute_sparql(&self, query: &str) -> Result<super::QueryResult> {
use crate::query::{
Executor, optimizer::Optimizer, planner::rdf::RdfPlanner, translators::sparql,
};
let logical_plan = sparql::translate(query)?;
let rdf_stats = self.rdf_store.get_or_collect_statistics();
let optimizer = Optimizer::from_rdf_statistics((*rdf_stats).clone());
let optimized_plan = optimizer.optimize(logical_plan)?;
if optimized_plan.explain {
use crate::query::processor::explain_result;
return Ok(explain_result(&optimized_plan));
}
let planner = RdfPlanner::new(Arc::clone(&self.rdf_store));
#[cfg(feature = "wal")]
let planner = planner.with_wal(self.wal.as_ref().map(Arc::clone));
let mut physical_plan = planner.plan(&optimized_plan)?;
let executor = Executor::with_columns(physical_plan.columns.clone());
executor.execute(physical_plan.operator.as_mut())
}
#[must_use]
pub fn rdf_store(&self) -> &Arc<RdfStore> {
&self.rdf_store
}
}
impl GrafeoDB {
pub fn batch_insert_rdf(
&self,
triples: impl IntoIterator<Item = grafeo_core::graph::rdf::Triple>,
) -> usize {
self.rdf_store.batch_insert(triples)
}
}
impl GrafeoDB {
#[must_use]
pub fn rdf_schema(&self) -> crate::admin::SchemaInfo {
let stats = self.rdf_store.stats();
let predicates = self
.rdf_store
.predicates()
.into_iter()
.map(|predicate| {
let count = self.rdf_store.triples_with_predicate(&predicate).len();
crate::admin::PredicateInfo {
iri: predicate.to_string(),
count,
}
})
.collect();
crate::admin::SchemaInfo::Rdf(crate::admin::RdfSchemaInfo {
predicates,
named_graphs: Vec::new(),
subject_count: stats.subject_count,
object_count: stats.object_count,
})
}
}
#[cfg(feature = "wal")]
pub(super) fn replay_rdf_wal_record(
rdf_store: &Arc<RdfStore>,
record: &grafeo_adapters::storage::wal::WalRecord,
) {
use grafeo_adapters::storage::wal::WalRecord;
use grafeo_core::graph::rdf::Term;
match record {
WalRecord::InsertRdfTriple {
subject,
predicate,
object,
graph,
} => {
if let (Some(s), Some(p), Some(o)) = (
Term::from_ntriples(subject),
Term::from_ntriples(predicate),
Term::from_ntriples(object),
) {
let triple = grafeo_core::graph::rdf::Triple::new(s, p, o);
let target = match graph {
Some(name) => rdf_store.graph_or_create(name),
None => Arc::clone(rdf_store),
};
target.insert(triple);
}
}
WalRecord::DeleteRdfTriple {
subject,
predicate,
object,
graph,
} => {
if let (Some(s), Some(p), Some(o)) = (
Term::from_ntriples(subject),
Term::from_ntriples(predicate),
Term::from_ntriples(object),
) {
let triple = grafeo_core::graph::rdf::Triple::new(s, p, o);
let target = match graph {
Some(name) => rdf_store.graph_or_create(name),
None => Arc::clone(rdf_store),
};
target.remove(&triple);
}
}
WalRecord::ClearRdfGraph { graph } => {
rdf_store.clear_graph(graph.as_deref());
}
WalRecord::CreateRdfGraph { name } => {
let _ = rdf_store.create_graph(name);
}
WalRecord::DropRdfGraph { name } => match name {
None => rdf_store.clear(),
Some(graph_name) => {
rdf_store.drop_graph(graph_name);
}
},
_ => {}
}
}