use super::RdfDataError;
use colored::*;
use oxigraph::sparql::{QueryResults, SparqlEvaluator};
use oxigraph::store::Store;
use oxrdf::{
BlankNode as OxBlankNode, Literal as OxLiteral, NamedNode as OxNamedNode, NamedOrBlankNode as OxSubject,
Term as OxTerm, Triple as OxTriple,
};
use prefixmap::PrefixMap;
use rudof_iri::IriS;
use rudof_rdf::{
rdf_core::{
BuildRDF, FocusRDF, Matcher, NeighsRDF, RDFFormat, Rdf, RdfDataConfig,
query::{QueryRDF, QueryResultFormat, QuerySolution, QuerySolutions, VarName},
},
rdf_impl::{InMemoryGraph, ReaderMode, SparqlEndpoint},
};
use serde::Serialize;
use serde::ser::SerializeStruct;
use sparesults::QuerySolution as SparQuerySolution;
use std::collections::HashMap;
use std::fmt::Debug;
use std::str::FromStr;
use std::{io, iter};
use tracing::trace;
#[derive(Clone)]
pub struct RdfData {
focus: Option<OxTerm>,
endpoints: HashMap<String, SparqlEndpoint>,
use_endpoints: HashMap<String, SparqlEndpoint>,
graph: Option<InMemoryGraph>,
store: Option<Store>,
}
impl RdfData {
pub fn new() -> RdfData {
RdfData {
endpoints: HashMap::new(),
use_endpoints: HashMap::new(),
graph: None,
store: None,
focus: None,
}
}
pub fn reset(&mut self) {
self.use_endpoints.clear();
self.graph = None;
self.store = None;
self.focus = None;
}
pub fn with_rdf_data_config(mut self, rdf_data_config: &RdfDataConfig) -> Result<Self, RdfDataError> {
if let Some(endpoints) = &rdf_data_config.endpoints {
for (name, endpoint_description) in endpoints.iter() {
let sparql_endpoint =
SparqlEndpoint::new(endpoint_description.query_url(), &endpoint_description.prefixmap()).map_err(
|e| RdfDataError::SRDFSparqlFromEndpointDescriptionError {
name: name.clone(),
url: endpoint_description.query_url().to_string(),
err: Box::new(e),
},
)?;
self.add_endpoint(name, sparql_endpoint);
}
}
Ok(self)
}
pub fn check_store(&mut self) -> Result<(), RdfDataError> {
if let Some(graph) = &self.graph {
trace!("Checking RDF store, graph exists, length: {}", graph.len());
if self.store.is_none() {
trace!("Initializing RDF store from in-memory graph");
let store = Store::new()?;
let mut loader = store.bulk_loader();
loader.load_quads(graph.quads())?;
loader.commit()?;
self.store = Some(store);
trace!(
"RDF store initialized with length: {:?}",
self.store.as_ref().map(|s| s.len())
);
}
}
Ok(())
}
pub fn from_graph(graph: InMemoryGraph) -> Result<RdfData, RdfDataError> {
Ok(RdfData {
endpoints: HashMap::new(),
graph: Some(graph),
store: None,
focus: None,
use_endpoints: HashMap::new(),
})
}
pub fn clean_all(&mut self) {
self.endpoints.clear();
self.use_endpoints.clear();
self.graph = None
}
pub fn graph(&self) -> Option<&InMemoryGraph> {
self.graph.as_ref()
}
pub fn graph_prefixmap(&self) -> PrefixMap {
self.graph.as_ref().map(|g| g.prefixmap().clone()).unwrap_or_default()
}
pub fn clean_graph(&mut self) {
self.graph = None
}
pub fn from_str(
data: &str,
format: &RDFFormat,
base: Option<&str>,
reader_mode: &ReaderMode,
) -> Result<RdfData, RdfDataError> {
let mut rdf_data = Self::new();
rdf_data.merge_from_reader(&mut data.as_bytes(), "String", format, base, reader_mode)?;
Ok(rdf_data)
}
pub fn merge_from_reader<R: io::Read>(
&mut self,
read: &mut R,
source_name: &str,
format: &RDFFormat,
base: Option<&str>,
reader_mode: &ReaderMode,
) -> Result<(), RdfDataError> {
match &mut self.graph {
Some(graph) => graph
.merge_from_reader(read, source_name, format, base, reader_mode)
.map_err(|e| RdfDataError::SRDFGraphError { err: Box::new(e) }),
None => {
let mut graph = InMemoryGraph::new();
graph
.merge_from_reader(read, source_name, format, base, reader_mode)
.map_err(|e| RdfDataError::SRDFGraphError { err: Box::new(e) })?;
self.graph = Some(graph);
Ok(())
},
}
}
pub fn from_endpoint(name: &str, endpoint: SparqlEndpoint) -> RdfData {
RdfData {
endpoints: HashMap::from([(name.to_string(), endpoint.clone())]),
graph: None,
store: None,
focus: None,
use_endpoints: HashMap::from([(name.to_string(), endpoint.clone())]),
}
}
pub fn add_endpoint(&mut self, name: &str, endpoint: SparqlEndpoint) {
self.endpoints
.entry(name.to_string())
.and_modify(|e| *e = endpoint.clone())
.or_insert(endpoint);
}
pub fn use_endpoints(&self) -> &HashMap<String, SparqlEndpoint> {
&self.use_endpoints
}
pub fn endpoints(&self) -> &HashMap<String, SparqlEndpoint> {
&self.endpoints
}
pub fn use_endpoint(&mut self, name: &str, endpoint: SparqlEndpoint) {
self.use_endpoints.insert(name.to_string(), endpoint);
}
pub fn dont_use_endpoint(&mut self, name: &str) {
self.use_endpoints.remove(name);
}
pub fn endpoints_to_use(&self) -> impl Iterator<Item = (&str, &SparqlEndpoint)> {
self.use_endpoints
.iter()
.map(|(name, endpoint)| (name.as_str(), endpoint))
}
pub fn show_blanknode(&self, bn: &OxBlankNode) -> String {
let str: String = format!("{bn}");
format!("{}", str.green())
}
pub fn show_literal(&self, lit: &OxLiteral) -> String {
let str = match lit.clone().destruct() {
(value, None, None, None) => format!("\"{}\"", value),
(value, Some(dt), None, None) => format!("\"{}\"^^{}", value, self.qualify_iri(&dt)),
(value, _, Some(lang), None) => format!("\"{}\"@{}", value, lang),
(value, _, Some(lang), Some(direction)) => {
format!("\"{}\"@{}{}", value, lang, direction)
},
_ => panic!("Unexpected literal structure <{}>", lit),
};
format!("{}", str.red())
}
pub fn serialize<W: io::Write>(&self, format: &RDFFormat, writer: &mut W) -> Result<(), RdfDataError> {
if let Some(graph) = &self.graph {
BuildRDF::serialize(graph, format, writer).map_err(|e| RdfDataError::Serializing {
format: *format,
error: format!("{e}"),
})?
}
for (name, e) in self.use_endpoints.iter() {
writeln!(writer, "Endpoint {}: {}", name, e.iri())?
}
Ok(())
}
pub fn find_endpoint(&self, name: &str) -> Option<SparqlEndpoint> {
self.endpoints.get(name).cloned()
}
pub fn all_triples(&self) -> Result<impl Iterator<Item = OxTriple>, RdfDataError> {
let graph_triples = self.graph.iter().flat_map(NeighsRDF::triples).flatten();
let endpoints_triples = self
.use_endpoints
.values()
.flat_map(|e| NeighsRDF::triples(e))
.flatten();
Ok(graph_triples.chain(endpoints_triples))
}
}
impl Serialize for RdfData {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let mut state = serializer.serialize_struct("RdfData", 2)?;
state.serialize_field("endpoints", &self.endpoints)?;
state.serialize_field("graph", &self.graph)?;
state.end()
}
}
impl Debug for RdfData {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RdfData")
.field("endpoints", &self.endpoints)
.field("graph", &self.graph)
.finish()
}
}
impl Default for RdfData {
fn default() -> Self {
Self::new()
}
}
impl Rdf for RdfData {
type IRI = OxNamedNode;
type BNode = OxBlankNode;
type Literal = OxLiteral;
type Subject = OxSubject;
type Term = OxTerm;
type Triple = OxTriple;
type Err = RdfDataError;
fn prefixmap(&self) -> Option<PrefixMap> {
match &self.graph {
Some(g) => Some(g.prefixmap().clone()),
None => {
if self.use_endpoints.is_empty() {
None
} else {
let mut pm = PrefixMap::new();
for e in self.use_endpoints.values() {
pm.merge(e.prefixmap().clone())
}
Some(pm)
}
},
}
}
fn qualify_iri(&self, node: &Self::IRI) -> String {
let iri = IriS::from_str(node.as_str()).unwrap();
if let Some(graph) = &self.graph {
graph.prefixmap().qualify(&iri)
} else {
for (_name, e) in self.use_endpoints.iter() {
if let Some(qualified) = e.prefixmap().qualify_optional(&iri) {
return qualified;
}
}
format!("{node}")
}
}
fn qualify_subject(&self, subj: &Self::Subject) -> String {
match subj {
OxSubject::BlankNode(bn) => self.show_blanknode(bn),
OxSubject::NamedNode(n) => self.qualify_iri(n),
}
}
fn qualify_term(&self, term: &Self::Term) -> String {
match term {
OxTerm::BlankNode(bn) => self.show_blanknode(bn),
OxTerm::Literal(lit) => self.show_literal(lit),
OxTerm::NamedNode(n) => self.qualify_iri(n),
OxTerm::Triple(_) => unimplemented!(),
}
}
fn resolve_prefix_local(&self, prefix: &str, local: &str) -> Result<IriS, prefixmap::error::PrefixMapError> {
if let Some(graph) = self.graph() {
let iri = graph.prefixmap().resolve_prefix_local(prefix, local)?;
Ok(iri.clone())
} else {
for (_name, e) in self.use_endpoints.iter() {
if let Ok(iri) = e.prefixmap().resolve_prefix_local(prefix, local) {
return Ok(iri.clone());
}
}
Err(prefixmap::error::PrefixMapError::PrefixNotFound {
prefix: prefix.to_string(),
prefixmap: PrefixMap::new(),
})
}
}
}
impl QueryRDF for RdfData {
fn query_construct(&self, query_str: &str, format: &QueryResultFormat) -> Result<String, RdfDataError>
where
Self: Sized,
{
let mut str = String::new();
if let Some(_store) = &self.store {
tracing::debug!("Querying in-memory store (we ignore it by now");
}
for (_name, endpoint) in self.endpoints_to_use() {
let new_str = endpoint.query_construct(query_str, format)?;
str.push_str(&new_str);
}
Ok(str)
}
fn query_select(&self, query_str: &str) -> Result<QuerySolutions<RdfData>, RdfDataError>
where
Self: Sized,
{
let mut sols: QuerySolutions<RdfData> = QuerySolutions::empty();
if let Some(store) = &self.store {
trace!("Querying in-memory store of length: {}", store.len()?);
let new_sol = SparqlEvaluator::new()
.parse_query(query_str)?
.on_store(store)
.execute()?;
trace!("Got results from in-memory store");
let sol = cnv_query_results(new_sol)?;
sols.extend(sol, self.graph_prefixmap());
} else {
trace!("No in-memory store to query");
}
for (_, endpoint) in self.endpoints_to_use() {
let new_sols = endpoint.query_select(query_str)?;
let new_sols_converted: Vec<QuerySolution<RdfData>> = new_sols.iter().map(cnv_sol).collect();
sols.extend(new_sols_converted, endpoint.prefixmap().clone());
}
Ok(sols)
}
fn query_ask(&self, _query: &str) -> Result<bool, Self::Err> {
todo!()
}
}
fn cnv_sol(sol: &QuerySolution<SparqlEndpoint>) -> QuerySolution<RdfData> {
sol.convert(|t| t.clone())
}
fn cnv_query_results(query_results: QueryResults) -> Result<Vec<QuerySolution<RdfData>>, RdfDataError> {
let mut results = Vec::new();
if let QueryResults::Solutions(solutions) = query_results {
trace!("Converting query solutions");
let mut counter = 0;
for solution in solutions {
counter += 1;
trace!("Converting solution {counter}");
let result = cnv_query_solution(solution?);
results.push(result)
}
}
Ok(results)
}
fn cnv_query_solution(qs: SparQuerySolution) -> QuerySolution<RdfData> {
let mut variables = Vec::new();
let mut values = Vec::new();
for v in qs.variables() {
let varname = VarName::new(v.as_str());
variables.push(varname);
}
for t in qs.values() {
let term = t.clone();
values.push(term)
}
QuerySolution::new(variables, values)
}
impl NeighsRDF for RdfData {
fn triples(&self) -> Result<impl Iterator<Item = Self::Triple>, Self::Err> {
let iter: Box<dyn Iterator<Item = OxTriple> + '_> = match &self.graph {
None => Box::new(iter::empty()),
Some(g) => Box::new(
g.triples()
.map_err(|e| RdfDataError::SRDFGraphError { err: Box::new(e) })?,
),
};
Ok(iter)
}
fn triples_matching<S, P, O>(
&self,
subject: &S,
predicate: &P,
object: &O,
) -> Result<impl Iterator<Item = Self::Triple> + '_, Self::Err>
where
S: Matcher<Self::Subject>,
P: Matcher<Self::IRI>,
O: Matcher<Self::Term>,
{
let graph_iter: Box<dyn Iterator<Item = OxTriple> + '_> = match &self.graph {
None => Box::new(iter::empty()),
Some(g) => Box::new(
g.triples_matching(subject, predicate, object)
.map_err(|e| RdfDataError::SRDFGraphError { err: Box::new(e) })?,
),
};
let mut endpoint_triples: Vec<OxTriple> = Vec::new();
for e in self.use_endpoints.values() {
endpoint_triples.extend(e.triples_matching(subject, predicate, object)?);
}
Ok(graph_iter.chain(endpoint_triples))
}
}
impl FocusRDF for RdfData {
fn set_focus(&mut self, focus: &Self::Term) {
self.focus = Some(focus.clone())
}
fn get_focus(&self) -> Option<&Self::Term> {
self.focus.as_ref()
}
}
impl BuildRDF for RdfData {
fn empty() -> Self {
RdfData::new()
}
fn add_base(&mut self, base: &Option<IriS>) {
if let Some(graph) = &mut self.graph {
graph.add_base(base)
}
}
fn add_prefix(&mut self, alias: &str, iri: &IriS) {
if let Some(graph) = &mut self.graph {
graph.add_prefix(alias, iri)
}
}
fn set_prefix_map(&mut self, prefix_map: PrefixMap) {
if let Some(graph) = &mut self.graph {
graph.set_prefix_map(prefix_map)
}
}
fn merge_prefixes(&mut self, prefix_map: PrefixMap) {
if let Some(graph) = &mut self.graph {
graph.merge_prefixes(prefix_map)
}
}
fn add_triple<S, P, O>(&mut self, subj: S, pred: P, obj: O) -> Result<(), Self::Err>
where
S: Into<Self::Subject>,
P: Into<Self::IRI>,
O: Into<Self::Term>,
{
match self.graph {
Some(ref mut graph) => {
graph
.add_triple(subj, pred, obj)
.map_err(|e| RdfDataError::SRDFGraphError { err: Box::new(e) })?;
Ok(())
},
None => {
let mut graph = InMemoryGraph::new();
graph
.add_triple(subj, pred, obj)
.map_err(|e| RdfDataError::SRDFGraphError { err: Box::new(e) })?;
self.graph = Some(graph);
Ok(())
},
}
}
fn remove_triple<S, P, O>(&mut self, subj: S, pred: P, obj: O) -> Result<(), Self::Err>
where
S: Into<Self::Subject>,
P: Into<Self::IRI>,
O: Into<Self::Term>,
{
self.graph
.as_mut()
.map(|g| g.remove_triple(subj, pred, obj))
.unwrap_or(Ok(()))
.map_err(|e| RdfDataError::SRDFGraphError { err: Box::new(e) })
}
fn add_type<S, T>(&mut self, node: S, type_: T) -> Result<(), Self::Err>
where
S: Into<Self::Subject>,
T: Into<Self::Term>,
{
self.graph
.as_mut()
.map(|g| g.add_type(node, type_))
.unwrap_or(Ok(()))
.map_err(|e| RdfDataError::SRDFGraphError { err: Box::new(e) })
}
fn serialize<W: std::io::Write>(&self, format: &RDFFormat, writer: &mut W) -> Result<(), Self::Err> {
if let Some(graph) = &self.graph {
BuildRDF::serialize(graph, format, writer).map_err(|e| RdfDataError::Serializing {
format: *format,
error: format!("{e}"),
})?;
Ok::<(), Self::Err>(())
} else {
Ok(())
}?;
for (name, endpoint) in &self.endpoints {
writeln!(writer, "Endpoint {}: {}", name, endpoint.iri())?;
}
Ok(())
}
fn add_bnode(&mut self) -> Result<Self::BNode, Self::Err> {
match self.graph {
Some(ref mut graph) => {
let bnode = graph
.add_bnode()
.map_err(|e| RdfDataError::SRDFGraphError { err: Box::new(e) })?;
Ok(bnode)
},
None => Err(RdfDataError::BNodeNoGraph),
}
}
}
#[cfg(test)]
mod tests {
use rudof_iri::iri;
use super::*;
#[test]
fn test_rdf_data_from_str() {
let data = "<http://example.org/subject> <http://example.org/predicate> <http://example.org/object> .";
let rdf_data = RdfData::from_str(data, &RDFFormat::NTriples, None, &ReaderMode::Lax);
assert!(rdf_data.is_ok());
let rdf_data = rdf_data.unwrap();
assert!(rdf_data.graph.is_some());
assert_eq!(rdf_data.graph.unwrap().triples().unwrap().count(), 1);
}
#[test]
fn test_build_rdf_data() {
let mut rdf_data = RdfData::new();
rdf_data.add_prefix("ex", &IriS::from_str("http://example.org/").unwrap());
rdf_data
.add_triple(
iri!("http://example.org/alice"),
iri!("http://example.org/knows"),
iri!("http://example.org/bob"),
)
.unwrap();
assert!(rdf_data.graph.is_some());
assert_eq!(rdf_data.graph.unwrap().triples().unwrap().count(), 1);
}
}