use std::collections::BTreeSet;
use oxirs_core::rdf_store::Store;
use tracing::{info, span, Level};
use crate::model::{StarGraph, StarTerm, StarTriple};
use crate::store::conversion;
use crate::store::StarStore;
use crate::{StarError, StarResult};
impl StarStore {
pub fn query(
&self,
subject: Option<&StarTerm>,
predicate: Option<&StarTerm>,
object: Option<&StarTerm>,
) -> StarResult<Vec<StarTriple>> {
let mut results = Vec::new();
let star_triples = self.star_triples.read().unwrap_or_else(|e| e.into_inner());
for triple in star_triples.iter() {
let matches = (subject.is_none() || subject == Some(&triple.subject))
&& (predicate.is_none() || predicate == Some(&triple.predicate))
&& (object.is_none() || object == Some(&triple.object));
if matches {
results.push(triple.clone());
}
}
if subject.map_or(true, |s| !matches!(s, StarTerm::QuotedTriple(_)))
&& predicate.map_or(true, |p| !matches!(p, StarTerm::QuotedTriple(_)))
&& object.map_or(true, |o| !matches!(o, StarTerm::QuotedTriple(_)))
{
let all = self.all_triples();
for triple in all {
if !triple.contains_quoted_triples() {
let matches = (subject.is_none() || subject == Some(&triple.subject))
&& (predicate.is_none() || predicate == Some(&triple.predicate))
&& (object.is_none() || object == Some(&triple.object));
if matches && !results.contains(&triple) {
results.push(triple);
}
}
}
}
Ok(results)
}
pub fn triples(&self) -> Vec<StarTriple> {
let mut all_triples = Vec::new();
{
let star_triples = self.star_triples.read().unwrap_or_else(|e| e.into_inner());
all_triples.extend(star_triples.clone());
}
{
let core_store = self.core_store.read().unwrap_or_else(|e| e.into_inner());
if let Ok(core_triples) = core_store.triples() {
drop(core_store); for core_triple in core_triples {
if let Ok(star_triple) = self.convert_from_core_triple(&core_triple) {
all_triples.push(star_triple);
}
}
}
}
all_triples
}
pub fn find_triples_containing_quoted(&self, quoted_triple: &StarTriple) -> Vec<StarTriple> {
let span = span!(Level::DEBUG, "find_triples_containing_quoted");
let _enter = span.enter();
let key = self.quoted_triple_key(quoted_triple);
let index = self
.quoted_triple_index
.read()
.unwrap_or_else(|e| e.into_inner());
let star_triples = self.star_triples.read().unwrap_or_else(|e| e.into_inner());
if let Some(indices) = index.signature_to_indices.get(&key) {
indices
.iter()
.filter_map(|&idx| star_triples.get(idx))
.cloned()
.collect()
} else {
Vec::new()
}
}
pub fn find_triples_by_quoted_pattern(
&self,
subject_pattern: Option<&StarTerm>,
predicate_pattern: Option<&StarTerm>,
object_pattern: Option<&StarTerm>,
) -> Vec<StarTriple> {
let span = span!(Level::DEBUG, "find_triples_by_quoted_pattern");
let _enter = span.enter();
let index = self
.quoted_triple_index
.read()
.unwrap_or_else(|e| e.into_inner());
let star_triples = self.star_triples.read().unwrap_or_else(|e| e.into_inner());
let mut candidate_indices: Option<BTreeSet<usize>> = None;
if let Some(subject_term) = subject_pattern {
let mut found_indices = BTreeSet::new();
let subject_key = format!("SUBJ:{subject_term}");
if let Some(indices) = index.subject_index.get(&subject_key) {
found_indices.extend(indices);
}
let predicate_key = format!("PRED:{subject_term}");
if let Some(indices) = index.predicate_index.get(&predicate_key) {
found_indices.extend(indices);
}
let object_key = format!("OBJ:{subject_term}");
if let Some(indices) = index.object_index.get(&object_key) {
found_indices.extend(indices);
}
if found_indices.is_empty() {
return Vec::new(); }
candidate_indices = Some(found_indices);
}
if let Some(predicate_term) = predicate_pattern {
let predicate_key = format!("PRED:{predicate_term}");
if let Some(indices) = index.predicate_index.get(&predicate_key) {
if let Some(ref mut candidates) = candidate_indices {
*candidates = candidates.intersection(indices).cloned().collect();
} else {
candidate_indices = Some(indices.clone());
}
if candidate_indices
.as_ref()
.expect("candidate_indices should be Some after setting")
.is_empty()
{
return Vec::new(); }
} else {
return Vec::new(); }
}
if let Some(object_term) = object_pattern {
let object_key = format!("OBJ:{object_term}");
if let Some(indices) = index.object_index.get(&object_key) {
if let Some(ref mut candidates) = candidate_indices {
*candidates = candidates.intersection(indices).cloned().collect();
} else {
candidate_indices = Some(indices.clone());
}
if candidate_indices
.as_ref()
.expect("candidate_indices should be Some after setting")
.is_empty()
{
return Vec::new(); }
} else {
return Vec::new(); }
}
let final_indices = candidate_indices.unwrap_or_else(|| {
index
.signature_to_indices
.values()
.flat_map(|indices| indices.iter())
.cloned()
.collect()
});
final_indices
.iter()
.filter_map(|&idx| star_triples.get(idx))
.cloned()
.collect()
}
pub fn find_triples_by_nesting_depth(
&self,
min_depth: usize,
max_depth: Option<usize>,
) -> Vec<StarTriple> {
let span = span!(Level::DEBUG, "find_triples_by_nesting_depth");
let _enter = span.enter();
let mut results = Vec::new();
let max_d = max_depth.unwrap_or(usize::MAX);
if min_depth == 0 {
let core_store = self.core_store.read().unwrap_or_else(|e| e.into_inner());
if let Ok(quads) = core_store.find_quads(None, None, None, None) {
for quad in quads {
let core_triple = quad.to_triple();
if let Ok(star_triple) = self.convert_from_core_triple(&core_triple) {
if !star_triple.contains_quoted_triples() {
results.push(star_triple);
}
}
}
}
}
let index = self
.quoted_triple_index
.read()
.unwrap_or_else(|e| e.into_inner());
let star_triples = self.star_triples.read().unwrap_or_else(|e| e.into_inner());
let mut result_indices = BTreeSet::new();
for (&_depth, indices) in index.nesting_depth_index.range(min_depth..=max_d) {
result_indices.extend(indices);
}
results.extend(
result_indices
.iter()
.filter_map(|&idx: &usize| star_triples.get(idx))
.cloned(),
);
results
}
pub fn to_graph(&self) -> StarGraph {
let star_triples = self.star_triples.read().unwrap_or_else(|e| e.into_inner());
let mut graph = StarGraph::new();
for triple in star_triples.iter() {
graph
.insert(triple.clone())
.expect("triple should be valid after validation on insert");
}
let core_store = self.core_store.read().unwrap_or_else(|e| e.into_inner());
if let Ok(quads) = core_store.find_quads(None, None, None, None) {
for quad in quads {
let core_triple = quad.to_triple();
if let Ok(star_triple) = self.convert_from_core_triple(&core_triple) {
if !star_triple.contains_quoted_triples() {
graph
.insert(star_triple)
.expect("triple should be valid after core store validation");
}
}
}
}
graph
}
pub fn from_graph(&self, graph: &StarGraph) -> StarResult<()> {
let span = span!(Level::INFO, "import_from_graph");
let _enter = span.enter();
for triple in graph.triples() {
self.insert(triple)?;
}
info!("Imported {} triples from graph", graph.len());
Ok(())
}
pub fn query_triples(
&self,
subject: Option<&StarTerm>,
predicate: Option<&StarTerm>,
object: Option<&StarTerm>,
) -> StarResult<Vec<StarTriple>> {
let mut results = Vec::new();
let star_triples = self.star_triples.read().unwrap_or_else(|e| e.into_inner());
for triple in star_triples.iter() {
if self.triple_matches(triple, subject, predicate, object) {
results.push(triple.clone());
}
}
let has_quoted_pattern = [subject, predicate, object]
.iter()
.any(|term| term.is_some_and(|t| t.is_quoted_triple()));
if !has_quoted_pattern {
let core_results = self.query_core_store(subject, predicate, object)?;
results.extend(core_results);
}
Ok(results)
}
pub(crate) fn query_core_store(
&self,
subject: Option<&StarTerm>,
predicate: Option<&StarTerm>,
object: Option<&StarTerm>,
) -> StarResult<Vec<StarTriple>> {
let core_store = self.core_store.read().unwrap_or_else(|e| e.into_inner());
let core_subject = match subject {
Some(term) => Some(conversion::star_term_to_subject(term)?),
None => None,
};
let core_predicate = match predicate {
Some(term) => Some(conversion::star_term_to_predicate(term)?),
None => None,
};
let core_object = match object {
Some(term) => Some(conversion::star_term_to_object(term)?),
None => None,
};
let core_quads = core_store
.find_quads(
core_subject.as_ref(),
core_predicate.as_ref(),
core_object.as_ref(),
None, )
.map_err(StarError::CoreError)?;
let mut results = Vec::new();
for quad in core_quads {
let triple = oxirs_core::model::Triple::new(
quad.subject().clone(),
quad.predicate().clone(),
quad.object().clone(),
);
let star_triple = self.convert_from_core_triple(&triple)?;
results.push(star_triple);
}
Ok(results)
}
pub(crate) fn convert_from_core_triple(
&self,
triple: &oxirs_core::model::Triple,
) -> StarResult<StarTriple> {
let subject = self.convert_subject_from_core(triple.subject())?;
let predicate = self.convert_predicate_from_core(triple.predicate())?;
let object = self.convert_object_from_core(triple.object())?;
Ok(StarTriple::new(subject, predicate, object))
}
pub(crate) fn convert_subject_from_core(
&self,
subject: &oxirs_core::model::Subject,
) -> StarResult<StarTerm> {
match subject {
oxirs_core::model::Subject::NamedNode(nn) => Ok(StarTerm::iri(nn.as_str())?),
oxirs_core::model::Subject::BlankNode(bn) => Ok(StarTerm::blank_node(bn.as_str())?),
oxirs_core::model::Subject::Variable(_) => Err(StarError::invalid_term_type(
"Variables are not supported in subjects for RDF-star storage".to_string(),
)),
oxirs_core::model::Subject::QuotedTriple(_) => Err(StarError::invalid_term_type(
"Quoted triples from core are not yet supported".to_string(),
)),
}
}
pub(crate) fn convert_predicate_from_core(
&self,
predicate: &oxirs_core::model::Predicate,
) -> StarResult<StarTerm> {
match predicate {
oxirs_core::model::Predicate::NamedNode(nn) => Ok(StarTerm::iri(nn.as_str())?),
oxirs_core::model::Predicate::Variable(_) => Err(StarError::invalid_term_type(
"Variables are not supported in predicates for RDF-star storage".to_string(),
)),
}
}
pub(crate) fn convert_object_from_core(
&self,
object: &oxirs_core::model::Object,
) -> StarResult<StarTerm> {
match object {
oxirs_core::model::Object::NamedNode(nn) => Ok(StarTerm::iri(nn.as_str())?),
oxirs_core::model::Object::BlankNode(bn) => Ok(StarTerm::blank_node(bn.as_str())?),
oxirs_core::model::Object::Literal(lit) => {
let language = lit.language().map(|lang| lang.to_string());
let datatype = if lit.is_lang_string() {
None
} else {
let dt_iri = lit.datatype().as_str();
if dt_iri == "http://www.w3.org/2001/XMLSchema#string" {
None
} else {
Some(crate::model::NamedNode {
iri: dt_iri.to_string(),
})
}
};
let star_literal = crate::model::Literal {
value: lit.value().to_string(),
language,
datatype,
};
Ok(StarTerm::Literal(star_literal))
}
oxirs_core::model::Object::Variable(_) => Err(StarError::invalid_term_type(
"Variables are not supported in objects for RDF-star storage".to_string(),
)),
oxirs_core::model::Object::QuotedTriple(_) => Err(StarError::invalid_term_type(
"Quoted triples from core are not yet supported".to_string(),
)),
}
}
pub fn all_triples(&self) -> Vec<StarTriple> {
let mut all_triples = Vec::new();
{
let star_triples = self.star_triples.read().unwrap_or_else(|e| e.into_inner());
all_triples.extend(star_triples.clone());
}
{
let core_store = self.core_store.read().unwrap_or_else(|e| e.into_inner());
if let Ok(quads) = core_store.find_quads(None, None, None, None) {
drop(core_store); for quad in quads {
let core_triple = quad.to_triple();
if let Ok(star_triple) = self.convert_from_core_triple(&core_triple) {
if !star_triple.contains_quoted_triples() {
all_triples.push(star_triple);
}
}
}
}
}
all_triples
}
pub fn iter(&self) -> impl Iterator<Item = StarTriple> + use<> {
self.all_triples().into_iter()
}
pub fn streaming_iter(&self, chunk_size: usize) -> StreamingTripleIterator<'_> {
StreamingTripleIterator::new(self, chunk_size)
}
}
pub struct StreamingTripleIterator<'a> {
store: &'a StarStore,
chunk_size: usize,
current_chunk: Vec<StarTriple>,
current_index: usize,
total_processed: usize,
}
impl<'a> StreamingTripleIterator<'a> {
pub(crate) fn new(store: &'a StarStore, chunk_size: usize) -> Self {
Self {
store,
chunk_size: chunk_size.max(1),
current_chunk: Vec::new(),
current_index: 0,
total_processed: 0,
}
}
fn load_next_chunk(&mut self) -> bool {
let all_triples = self.store.all_triples();
let start = self.total_processed;
let end = (start + self.chunk_size).min(all_triples.len());
if start >= all_triples.len() {
return false;
}
self.current_chunk.clear();
self.current_chunk
.extend(all_triples.iter().skip(start).take(end - start).cloned());
self.current_index = 0;
!self.current_chunk.is_empty()
}
}
impl<'a> Iterator for StreamingTripleIterator<'a> {
type Item = StarTriple;
fn next(&mut self) -> Option<Self::Item> {
if self.current_index >= self.current_chunk.len() && !self.load_next_chunk() {
return None;
}
let triple = self.current_chunk.get(self.current_index).cloned();
if triple.is_some() {
self.current_index += 1;
self.total_processed += 1;
}
triple
}
}