#[path = "store/bulk_insert.rs"]
mod bulk_insert_mod;
#[path = "store/cache.rs"]
mod cache_mod;
#[path = "store/conversion.rs"]
mod conversion;
#[path = "store/index.rs"]
mod index;
#[path = "store/pool.rs"]
mod pool_mod;
use std::collections::BTreeSet;
use std::sync::{Arc, RwLock};
use std::thread;
use std::time::Instant;
use oxirs_core::rdf_store::{ConcreteStore as CoreStore, Store};
use tracing::{debug, info, span, Level};
use crate::model::{StarGraph, StarTerm, StarTriple};
use crate::{StarConfig, StarError, StarResult, StarStatistics};
use bulk_insert_mod::BulkInsertConfig;
use cache_mod::{CacheConfig, CacheStatistics, StarCache};
use index::{IndexStatistics, QuotedTripleIndex};
use pool_mod::ConnectionPool;
pub use bulk_insert_mod::BulkInsertConfig as PublicBulkInsertConfig;
pub use cache_mod::{
CacheConfig as PublicCacheConfig, CacheStatistics as PublicCacheStatistics,
StarCache as PublicStarCache,
};
pub use index::IndexStatistics as PublicIndexStatistics;
pub use pool_mod::{
ConnectionPool as PublicConnectionPool, PoolStatistics as PublicPoolStatistics,
PooledConnection as PublicPooledConnection,
};
#[allow(unused_imports)]
use pool_mod::{PoolStatistics, PooledConnection};
#[derive(Clone)]
pub struct StarStore {
core_store: Arc<RwLock<CoreStore>>,
star_triples: Arc<RwLock<Vec<StarTriple>>>,
quoted_triple_index: Arc<RwLock<QuotedTripleIndex>>,
config: StarConfig,
statistics: Arc<RwLock<StarStatistics>>,
cache: Arc<StarCache>,
bulk_insert_state: Arc<RwLock<BulkInsertState>>,
memory_mapped: Arc<RwLock<MemoryMappedState>>,
}
#[derive(Debug, Default)]
struct BulkInsertState {
active: bool,
pending_triples: Vec<StarTriple>,
current_memory_usage: usize,
batch_count: usize,
}
#[derive(Debug, Default)]
struct MemoryMappedState {
enabled: bool,
file_path: Option<String>,
compression_enabled: bool,
last_sync: Option<Instant>,
}
impl StarStore {
pub fn new() -> Self {
Self::with_config(StarConfig::default())
}
pub fn with_config(config: StarConfig) -> Self {
let span = span!(Level::INFO, "new_star_store");
let _enter = span.enter();
info!("Creating new RDF-star store with optimizations");
debug!("Configuration: {:?}", config);
Self {
core_store: Arc::new(RwLock::new(
CoreStore::new().expect("Failed to create core store"),
)),
star_triples: Arc::new(RwLock::new(Vec::new())),
quoted_triple_index: Arc::new(RwLock::new(QuotedTripleIndex::new())),
config: config.clone(),
statistics: Arc::new(RwLock::new(StarStatistics::default())),
cache: Arc::new(StarCache::new(CacheConfig::default())),
bulk_insert_state: Arc::new(RwLock::new(BulkInsertState::default())),
memory_mapped: Arc::new(RwLock::new(MemoryMappedState::default())),
}
}
pub fn config(&self) -> &StarConfig {
&self.config
}
pub fn query(
&self,
subject: Option<&StarTerm>,
predicate: Option<&StarTerm>,
object: Option<&StarTerm>,
) -> StarResult<Vec<StarTriple>> {
let mut results = Vec::new();
let star_triples = self.star_triples.read().unwrap_or_else(|e| e.into_inner());
for triple in star_triples.iter() {
let matches = (subject.is_none() || subject == Some(&triple.subject))
&& (predicate.is_none() || predicate == Some(&triple.predicate))
&& (object.is_none() || object == Some(&triple.object));
if matches {
results.push(triple.clone());
}
}
if subject.map_or(true, |s| !matches!(s, StarTerm::QuotedTriple(_)))
&& predicate.map_or(true, |p| !matches!(p, StarTerm::QuotedTriple(_)))
&& object.map_or(true, |o| !matches!(o, StarTerm::QuotedTriple(_)))
{
let all = self.all_triples();
for triple in all {
if !triple.contains_quoted_triples() {
let matches = (subject.is_none() || subject == Some(&triple.subject))
&& (predicate.is_none() || predicate == Some(&triple.predicate))
&& (object.is_none() || object == Some(&triple.object));
if matches && !results.contains(&triple) {
results.push(triple);
}
}
}
}
Ok(results)
}
pub fn insert(&self, triple: &StarTriple) -> StarResult<()> {
let span = span!(Level::DEBUG, "insert_triple");
let _enter = span.enter();
let start_time = Instant::now();
triple.validate()?;
crate::validate_nesting_depth(&triple.subject, self.config.max_nesting_depth)?;
crate::validate_nesting_depth(&triple.predicate, self.config.max_nesting_depth)?;
crate::validate_nesting_depth(&triple.object, self.config.max_nesting_depth)?;
eprintln!(
"DEBUG INSERT: Triple contains quoted triples: {}",
triple.contains_quoted_triples()
);
if triple.contains_quoted_triples() {
eprintln!("DEBUG INSERT: Inserting as star triple");
self.insert_star_triple(triple)?;
} else {
eprintln!("DEBUG INSERT: Inserting as regular triple");
self.insert_regular_triple(triple)?;
}
{
let mut stats = self.statistics.write().unwrap_or_else(|e| e.into_inner());
stats.processing_time_us += start_time.elapsed().as_micros() as u64;
if triple.contains_quoted_triples() {
stats.quoted_triples_count += 1;
stats.max_nesting_encountered =
stats.max_nesting_encountered.max(triple.nesting_depth());
}
}
debug!("Inserted triple: {}", triple);
Ok(())
}
fn insert_regular_triple(&self, triple: &StarTriple) -> StarResult<()> {
eprintln!("DEBUG: Inserting regular triple into core store");
let core_triple = self.convert_to_core_triple(triple)?;
eprintln!("DEBUG: Converted to core triple successfully");
let core_quad = oxirs_core::model::Quad::from_triple(core_triple);
eprintln!("DEBUG: Created core quad for insertion: {core_quad:?}");
let core_store = self.core_store.write().unwrap_or_else(|e| e.into_inner());
let result = CoreStore::insert_quad(&core_store, core_quad).map_err(StarError::CoreError);
eprintln!("DEBUG: Core store insert result: {result:?}");
result?;
eprintln!("DEBUG: Successfully inserted regular triple");
Ok(())
}
fn convert_to_core_triple(&self, triple: &StarTriple) -> StarResult<oxirs_core::model::Triple> {
let subject = conversion::star_term_to_subject(&triple.subject)?;
let predicate = conversion::star_term_to_predicate(&triple.predicate)?;
let object = conversion::star_term_to_object(&triple.object)?;
Ok(oxirs_core::model::Triple::new(subject, predicate, object))
}
fn insert_star_triple(&self, triple: &StarTriple) -> StarResult<()> {
let mut star_triples = self.star_triples.write().unwrap_or_else(|e| e.into_inner());
let mut index = self
.quoted_triple_index
.write()
.unwrap_or_else(|e| e.into_inner());
let triple_index = star_triples.len();
star_triples.push(triple.clone());
self.index_quoted_triples(triple, triple_index, &mut index);
debug!(
"Inserted star triple with {} quoted triples",
self.count_quoted_triples_in_triple(triple)
);
Ok(())
}
fn index_quoted_triples(
&self,
triple: &StarTriple,
triple_index: usize,
index: &mut QuotedTripleIndex,
) {
self.index_quoted_triples_recursive(triple, triple_index, index);
let depth = triple.nesting_depth();
index
.nesting_depth_index
.entry(depth)
.or_default()
.insert(triple_index);
}
fn index_quoted_triples_recursive(
&self,
triple: &StarTriple,
triple_index: usize,
index: &mut QuotedTripleIndex,
) {
if let StarTerm::QuotedTriple(qt) = &triple.subject {
let signature = self.quoted_triple_key(qt);
index
.signature_to_indices
.entry(signature)
.or_default()
.insert(triple_index);
let subject_key = format!("SUBJ:{}", qt.subject);
index
.subject_index
.entry(subject_key)
.or_default()
.insert(triple_index);
self.index_quoted_triples_recursive(qt, triple_index, index);
}
if let StarTerm::QuotedTriple(qt) = &triple.predicate {
let signature = self.quoted_triple_key(qt);
index
.signature_to_indices
.entry(signature)
.or_default()
.insert(triple_index);
let predicate_key = format!("PRED:{}", qt.predicate);
index
.predicate_index
.entry(predicate_key)
.or_default()
.insert(triple_index);
let qt_subject_key = format!("SUBJ:{}", qt.subject);
index
.subject_index
.entry(qt_subject_key)
.or_default()
.insert(triple_index);
let qt_object_key = format!("OBJ:{}", qt.object);
index
.object_index
.entry(qt_object_key)
.or_default()
.insert(triple_index);
self.index_quoted_triples_recursive(qt, triple_index, index);
}
if let StarTerm::QuotedTriple(qt) = &triple.object {
let signature = self.quoted_triple_key(qt);
index
.signature_to_indices
.entry(signature)
.or_default()
.insert(triple_index);
let object_key = format!("OBJ:{}", qt.object);
index
.object_index
.entry(object_key)
.or_default()
.insert(triple_index);
let qt_subject_key = format!("SUBJ:{}", qt.subject);
index
.subject_index
.entry(qt_subject_key)
.or_default()
.insert(triple_index);
let qt_predicate_key = format!("PRED:{}", qt.predicate);
index
.predicate_index
.entry(qt_predicate_key)
.or_default()
.insert(triple_index);
self.index_quoted_triples_recursive(qt, triple_index, index);
}
}
fn quoted_triple_key(&self, triple: &StarTriple) -> String {
format!("{}|{}|{}", triple.subject, triple.predicate, triple.object)
}
fn update_indices_after_removal(indices: &mut BTreeSet<usize>, pos: usize) {
indices.remove(&pos);
let updated: BTreeSet<usize> = indices
.iter()
.map(|&idx| if idx > pos { idx - 1 } else { idx })
.collect();
*indices = updated;
}
#[allow(clippy::only_used_in_recursion)]
fn count_quoted_triples_in_triple(&self, triple: &StarTriple) -> usize {
let mut count = 0;
if triple.subject.is_quoted_triple() {
count += 1;
if let StarTerm::QuotedTriple(qt) = &triple.subject {
count += self.count_quoted_triples_in_triple(qt);
}
}
if triple.predicate.is_quoted_triple() {
count += 1;
if let StarTerm::QuotedTriple(qt) = &triple.predicate {
count += self.count_quoted_triples_in_triple(qt);
}
}
if triple.object.is_quoted_triple() {
count += 1;
if let StarTerm::QuotedTriple(qt) = &triple.object {
count += self.count_quoted_triples_in_triple(qt);
}
}
count
}
pub fn remove(&self, triple: &StarTriple) -> StarResult<bool> {
let span = span!(Level::DEBUG, "remove_triple");
let _enter = span.enter();
eprintln!("DEBUG: Attempting to remove triple: {triple}");
eprintln!(
"DEBUG: Triple contains quoted triples: {}",
triple.contains_quoted_triples()
);
if triple.contains_quoted_triples() {
let mut star_triples = self.star_triples.write().unwrap_or_else(|e| e.into_inner());
if let Some(pos) = star_triples.iter().position(|t| t == triple) {
star_triples.remove(pos);
let mut index = self
.quoted_triple_index
.write()
.unwrap_or_else(|e| e.into_inner());
for (_, indices) in index.signature_to_indices.iter_mut() {
Self::update_indices_after_removal(indices, pos);
}
for (_, indices) in index.subject_index.iter_mut() {
Self::update_indices_after_removal(indices, pos);
}
for (_, indices) in index.predicate_index.iter_mut() {
Self::update_indices_after_removal(indices, pos);
}
for (_, indices) in index.object_index.iter_mut() {
Self::update_indices_after_removal(indices, pos);
}
for (_, indices) in index.nesting_depth_index.iter_mut() {
Self::update_indices_after_removal(indices, pos);
}
debug!("Removed star triple: {}", triple);
return Ok(true);
}
} else {
eprintln!("DEBUG: Attempting to remove regular triple from core store");
let core_store = self.core_store.write().unwrap_or_else(|e| e.into_inner());
if let Ok(core_triple) = self.convert_to_core_triple(triple) {
eprintln!("DEBUG: Successfully converted to core triple");
let core_quad = oxirs_core::model::Quad::from_triple(core_triple);
eprintln!("DEBUG: Created core quad: {core_quad:?}");
match CoreStore::remove_quad(&core_store, &core_quad) {
Ok(removed) => {
eprintln!("DEBUG: Core store remove_quad returned: {removed}");
if removed {
eprintln!("DEBUG: Removed regular triple: {triple}");
return Ok(true);
} else {
eprintln!(
"DEBUG: Core store remove_quad returned false - triple not found"
);
}
}
Err(e) => {
eprintln!("DEBUG: Core store remove_quad failed with error: {e:?}");
}
}
} else {
eprintln!("DEBUG: Failed to convert triple to core triple");
}
}
Ok(false)
}
pub fn contains(&self, triple: &StarTriple) -> bool {
let star_triples = self.star_triples.read().unwrap_or_else(|e| e.into_inner());
if star_triples.contains(triple) {
return true;
}
if !triple.contains_quoted_triples() {
let core_store = self.core_store.read().unwrap_or_else(|e| e.into_inner());
if let Ok(core_triple) = self.convert_to_core_triple(triple) {
let core_quad = oxirs_core::model::Quad::from_triple(core_triple);
if let Ok(quads) = core_store.find_quads(
Some(core_quad.subject()),
Some(core_quad.predicate()),
Some(core_quad.object()),
Some(core_quad.graph_name()),
) {
return !quads.is_empty();
}
}
}
false
}
pub fn triples(&self) -> Vec<StarTriple> {
let mut all_triples = Vec::new();
{
let star_triples = self.star_triples.read().unwrap_or_else(|e| e.into_inner());
all_triples.extend(star_triples.clone());
}
{
let core_store = self.core_store.read().unwrap_or_else(|e| e.into_inner());
if let Ok(core_triples) = core_store.triples() {
drop(core_store); for core_triple in core_triples {
if let Ok(star_triple) = self.convert_from_core_triple(&core_triple) {
all_triples.push(star_triple);
}
}
}
}
all_triples
}
pub fn find_triples_containing_quoted(&self, quoted_triple: &StarTriple) -> Vec<StarTriple> {
let span = span!(Level::DEBUG, "find_triples_containing_quoted");
let _enter = span.enter();
let key = self.quoted_triple_key(quoted_triple);
let index = self
.quoted_triple_index
.read()
.unwrap_or_else(|e| e.into_inner());
let star_triples = self.star_triples.read().unwrap_or_else(|e| e.into_inner());
if let Some(indices) = index.signature_to_indices.get(&key) {
indices
.iter()
.filter_map(|&idx| star_triples.get(idx))
.cloned()
.collect()
} else {
Vec::new()
}
}
pub fn find_triples_by_quoted_pattern(
&self,
subject_pattern: Option<&StarTerm>,
predicate_pattern: Option<&StarTerm>,
object_pattern: Option<&StarTerm>,
) -> Vec<StarTriple> {
let span = span!(Level::DEBUG, "find_triples_by_quoted_pattern");
let _enter = span.enter();
let index = self
.quoted_triple_index
.read()
.unwrap_or_else(|e| e.into_inner());
let star_triples = self.star_triples.read().unwrap_or_else(|e| e.into_inner());
let mut candidate_indices: Option<BTreeSet<usize>> = None;
if let Some(subject_term) = subject_pattern {
let mut found_indices = BTreeSet::new();
let subject_key = format!("SUBJ:{subject_term}");
if let Some(indices) = index.subject_index.get(&subject_key) {
found_indices.extend(indices);
}
let predicate_key = format!("PRED:{subject_term}");
if let Some(indices) = index.predicate_index.get(&predicate_key) {
found_indices.extend(indices);
}
let object_key = format!("OBJ:{subject_term}");
if let Some(indices) = index.object_index.get(&object_key) {
found_indices.extend(indices);
}
if found_indices.is_empty() {
return Vec::new(); }
candidate_indices = Some(found_indices);
}
if let Some(predicate_term) = predicate_pattern {
let predicate_key = format!("PRED:{predicate_term}");
if let Some(indices) = index.predicate_index.get(&predicate_key) {
if let Some(ref mut candidates) = candidate_indices {
*candidates = candidates.intersection(indices).cloned().collect();
} else {
candidate_indices = Some(indices.clone());
}
if candidate_indices
.as_ref()
.expect("candidate_indices should be Some after setting")
.is_empty()
{
return Vec::new(); }
} else {
return Vec::new(); }
}
if let Some(object_term) = object_pattern {
let object_key = format!("OBJ:{object_term}");
if let Some(indices) = index.object_index.get(&object_key) {
if let Some(ref mut candidates) = candidate_indices {
*candidates = candidates.intersection(indices).cloned().collect();
} else {
candidate_indices = Some(indices.clone());
}
if candidate_indices
.as_ref()
.expect("candidate_indices should be Some after setting")
.is_empty()
{
return Vec::new(); }
} else {
return Vec::new(); }
}
let final_indices = candidate_indices.unwrap_or_else(|| {
index
.signature_to_indices
.values()
.flat_map(|indices| indices.iter())
.cloned()
.collect()
});
final_indices
.iter()
.filter_map(|&idx| star_triples.get(idx))
.cloned()
.collect()
}
pub fn find_triples_by_nesting_depth(
&self,
min_depth: usize,
max_depth: Option<usize>,
) -> Vec<StarTriple> {
let span = span!(Level::DEBUG, "find_triples_by_nesting_depth");
let _enter = span.enter();
let mut results = Vec::new();
let max_d = max_depth.unwrap_or(usize::MAX);
if min_depth == 0 {
let core_store = self.core_store.read().unwrap_or_else(|e| e.into_inner());
if let Ok(quads) = core_store.find_quads(None, None, None, None) {
for quad in quads {
let core_triple = quad.to_triple();
if let Ok(star_triple) = self.convert_from_core_triple(&core_triple) {
if !star_triple.contains_quoted_triples() {
results.push(star_triple);
}
}
}
}
}
let index = self
.quoted_triple_index
.read()
.unwrap_or_else(|e| e.into_inner());
let star_triples = self.star_triples.read().unwrap_or_else(|e| e.into_inner());
let mut result_indices = BTreeSet::new();
for (&_depth, indices) in index.nesting_depth_index.range(min_depth..=max_d) {
result_indices.extend(indices);
}
results.extend(
result_indices
.iter()
.filter_map(|&idx: &usize| star_triples.get(idx))
.cloned(),
);
results
}
pub fn len(&self) -> usize {
let star_triples = self.star_triples.read().unwrap_or_else(|e| e.into_inner());
let core_store = self.core_store.read().unwrap_or_else(|e| e.into_inner());
let regular_count = core_store.len().unwrap_or(0);
let star_count = star_triples.len();
regular_count + star_count
}
pub fn is_empty(&self) -> bool {
let star_triples = self.star_triples.read().unwrap_or_else(|e| e.into_inner());
let core_store = self.core_store.read().unwrap_or_else(|e| e.into_inner());
star_triples.is_empty() && core_store.is_empty().unwrap_or(true)
}
pub fn clear(&self) -> StarResult<()> {
let span = span!(Level::INFO, "clear_store");
let _enter = span.enter();
{
let mut star_triples = self.star_triples.write().unwrap_or_else(|e| e.into_inner());
star_triples.clear();
}
{
let mut core_store = self.core_store.write().unwrap_or_else(|e| e.into_inner());
*core_store = CoreStore::new().map_err(StarError::CoreError)?;
}
{
let mut index = self
.quoted_triple_index
.write()
.unwrap_or_else(|e| e.into_inner());
index.clear();
}
{
let mut stats = self.statistics.write().unwrap_or_else(|e| e.into_inner());
*stats = StarStatistics::default();
}
info!("Cleared all triples from store");
Ok(())
}
pub fn statistics(&self) -> StarStatistics {
let stats = self.statistics.read().unwrap_or_else(|e| e.into_inner());
stats.clone()
}
pub fn to_graph(&self) -> StarGraph {
let star_triples = self.star_triples.read().unwrap_or_else(|e| e.into_inner());
let mut graph = StarGraph::new();
for triple in star_triples.iter() {
graph
.insert(triple.clone())
.expect("triple should be valid after validation on insert");
}
let core_store = self.core_store.read().unwrap_or_else(|e| e.into_inner());
if let Ok(quads) = core_store.find_quads(None, None, None, None) {
for quad in quads {
let core_triple = quad.to_triple();
if let Ok(star_triple) = self.convert_from_core_triple(&core_triple) {
if !star_triple.contains_quoted_triples() {
graph
.insert(star_triple)
.expect("triple should be valid after core store validation");
}
}
}
}
graph
}
pub fn from_graph(&self, graph: &StarGraph) -> StarResult<()> {
let span = span!(Level::INFO, "import_from_graph");
let _enter = span.enter();
for triple in graph.triples() {
self.insert(triple)?;
}
info!("Imported {} triples from graph", graph.len());
Ok(())
}
pub fn optimize(&self) -> StarResult<()> {
let span = span!(Level::INFO, "optimize_store");
let _enter = span.enter();
let star_triples = self.star_triples.read().unwrap_or_else(|e| e.into_inner());
let mut index = self
.quoted_triple_index
.write()
.unwrap_or_else(|e| e.into_inner());
index.clear();
for (i, triple) in star_triples.iter().enumerate() {
if triple.contains_quoted_triples() {
self.index_quoted_triples(triple, i, &mut index);
}
}
index
.signature_to_indices
.retain(|_, indices| !indices.is_empty());
index.subject_index.retain(|_, indices| !indices.is_empty());
index
.predicate_index
.retain(|_, indices| !indices.is_empty());
index.object_index.retain(|_, indices| !indices.is_empty());
index
.nesting_depth_index
.retain(|_, indices| !indices.is_empty());
info!(
"Store optimization completed - rebuilt {} index entries",
index.signature_to_indices.len()
+ index.subject_index.len()
+ index.predicate_index.len()
+ index.object_index.len()
+ index.nesting_depth_index.len()
);
Ok(())
}
pub fn query_triples(
&self,
subject: Option<&StarTerm>,
predicate: Option<&StarTerm>,
object: Option<&StarTerm>,
) -> StarResult<Vec<StarTriple>> {
let mut results = Vec::new();
let star_triples = self.star_triples.read().unwrap_or_else(|e| e.into_inner());
for triple in star_triples.iter() {
if self.triple_matches(triple, subject, predicate, object) {
results.push(triple.clone());
}
}
let has_quoted_pattern = [subject, predicate, object]
.iter()
.any(|term| term.is_some_and(|t| t.is_quoted_triple()));
if !has_quoted_pattern {
let core_results = self.query_core_store(subject, predicate, object)?;
results.extend(core_results);
}
Ok(results)
}
fn query_core_store(
&self,
subject: Option<&StarTerm>,
predicate: Option<&StarTerm>,
object: Option<&StarTerm>,
) -> StarResult<Vec<StarTriple>> {
let core_store = self.core_store.read().unwrap_or_else(|e| e.into_inner());
let core_subject = match subject {
Some(term) => Some(conversion::star_term_to_subject(term)?),
None => None,
};
let core_predicate = match predicate {
Some(term) => Some(conversion::star_term_to_predicate(term)?),
None => None,
};
let core_object = match object {
Some(term) => Some(conversion::star_term_to_object(term)?),
None => None,
};
let core_quads = core_store
.find_quads(
core_subject.as_ref(),
core_predicate.as_ref(),
core_object.as_ref(),
None, )
.map_err(StarError::CoreError)?;
let mut results = Vec::new();
for quad in core_quads {
let triple = oxirs_core::model::Triple::new(
quad.subject().clone(),
quad.predicate().clone(),
quad.object().clone(),
);
let star_triple = self.convert_from_core_triple(&triple)?;
results.push(star_triple);
}
Ok(results)
}
fn convert_from_core_triple(
&self,
triple: &oxirs_core::model::Triple,
) -> StarResult<StarTriple> {
let subject = self.convert_subject_from_core(triple.subject())?;
let predicate = self.convert_predicate_from_core(triple.predicate())?;
let object = self.convert_object_from_core(triple.object())?;
Ok(StarTriple::new(subject, predicate, object))
}
fn convert_subject_from_core(
&self,
subject: &oxirs_core::model::Subject,
) -> StarResult<StarTerm> {
match subject {
oxirs_core::model::Subject::NamedNode(nn) => Ok(StarTerm::iri(nn.as_str())?),
oxirs_core::model::Subject::BlankNode(bn) => Ok(StarTerm::blank_node(bn.as_str())?),
oxirs_core::model::Subject::Variable(_) => Err(StarError::invalid_term_type(
"Variables are not supported in subjects for RDF-star storage".to_string(),
)),
oxirs_core::model::Subject::QuotedTriple(_) => Err(StarError::invalid_term_type(
"Quoted triples from core are not yet supported".to_string(),
)),
}
}
fn convert_predicate_from_core(
&self,
predicate: &oxirs_core::model::Predicate,
) -> StarResult<StarTerm> {
match predicate {
oxirs_core::model::Predicate::NamedNode(nn) => Ok(StarTerm::iri(nn.as_str())?),
oxirs_core::model::Predicate::Variable(_) => Err(StarError::invalid_term_type(
"Variables are not supported in predicates for RDF-star storage".to_string(),
)),
}
}
fn convert_object_from_core(&self, object: &oxirs_core::model::Object) -> StarResult<StarTerm> {
match object {
oxirs_core::model::Object::NamedNode(nn) => Ok(StarTerm::iri(nn.as_str())?),
oxirs_core::model::Object::BlankNode(bn) => Ok(StarTerm::blank_node(bn.as_str())?),
oxirs_core::model::Object::Literal(lit) => {
let language = lit.language().map(|lang| lang.to_string());
let datatype = if lit.is_lang_string() {
None
} else {
let dt_iri = lit.datatype().as_str();
if dt_iri == "http://www.w3.org/2001/XMLSchema#string" {
None
} else {
Some(crate::model::NamedNode {
iri: dt_iri.to_string(),
})
}
};
let star_literal = crate::model::Literal {
value: lit.value().to_string(),
language,
datatype,
};
Ok(StarTerm::Literal(star_literal))
}
oxirs_core::model::Object::Variable(_) => Err(StarError::invalid_term_type(
"Variables are not supported in objects for RDF-star storage".to_string(),
)),
oxirs_core::model::Object::QuotedTriple(_) => Err(StarError::invalid_term_type(
"Quoted triples from core are not yet supported".to_string(),
)),
}
}
fn triple_matches(
&self,
triple: &StarTriple,
subject: Option<&StarTerm>,
predicate: Option<&StarTerm>,
object: Option<&StarTerm>,
) -> bool {
if let Some(s) = subject {
if &triple.subject != s {
return false;
}
}
if let Some(p) = predicate {
if &triple.predicate != p {
return false;
}
}
if let Some(o) = object {
if &triple.object != o {
return false;
}
}
true
}
pub fn update_config(&mut self, config: StarConfig) -> StarResult<()> {
crate::init_star_system(config.clone())?;
self.config = config;
Ok(())
}
}
impl Default for StarStore {
fn default() -> Self {
Self::new()
}
}
impl StarStore {
pub fn all_triples(&self) -> Vec<StarTriple> {
let mut all_triples = Vec::new();
{
let star_triples = self.star_triples.read().unwrap_or_else(|e| e.into_inner());
all_triples.extend(star_triples.clone());
}
{
let core_store = self.core_store.read().unwrap_or_else(|e| e.into_inner());
if let Ok(quads) = core_store.find_quads(None, None, None, None) {
drop(core_store); for quad in quads {
let core_triple = quad.to_triple();
if let Ok(star_triple) = self.convert_from_core_triple(&core_triple) {
if !star_triple.contains_quoted_triples() {
all_triples.push(star_triple);
}
}
}
}
}
all_triples
}
pub fn iter(&self) -> impl Iterator<Item = StarTriple> + use<> {
self.all_triples().into_iter()
}
pub fn streaming_iter(&self, chunk_size: usize) -> StreamingTripleIterator<'_> {
StreamingTripleIterator::new(self, chunk_size)
}
pub fn bulk_insert(&self, triples: &[StarTriple], config: &BulkInsertConfig) -> StarResult<()> {
let span = span!(Level::INFO, "bulk_insert", count = triples.len());
let _enter = span.enter();
info!("Starting bulk insertion of {} triples", triples.len());
let start_time = Instant::now();
{
let mut bulk_state = self
.bulk_insert_state
.write()
.unwrap_or_else(|e| e.into_inner());
bulk_state.active = true;
bulk_state.pending_triples.clear();
bulk_state.current_memory_usage = 0;
bulk_state.batch_count = 0;
}
if config.parallel_processing && triples.len() > config.batch_size {
self.bulk_insert_parallel(triples, config)?;
} else {
self.bulk_insert_sequential(triples, config)?;
}
self.finalize_bulk_insert(config)?;
let elapsed = start_time.elapsed();
info!(
"Bulk insertion completed in {:?} for {} triples",
elapsed,
triples.len()
);
{
let mut stats = self.statistics.write().unwrap_or_else(|e| e.into_inner());
stats.processing_time_us += elapsed.as_micros() as u64;
}
Ok(())
}
fn bulk_insert_sequential(
&self,
triples: &[StarTriple],
config: &BulkInsertConfig,
) -> StarResult<()> {
for batch in triples.chunks(config.batch_size) {
for triple in batch {
triple.validate()?;
if triple.contains_quoted_triples() {
if config.defer_index_updates {
let mut bulk_state = self
.bulk_insert_state
.write()
.unwrap_or_else(|e| e.into_inner());
bulk_state.pending_triples.push(triple.clone());
bulk_state.current_memory_usage += self.estimate_triple_memory_size(triple);
} else {
self.insert_star_triple(triple)?;
}
} else {
self.insert_regular_triple(triple)?;
}
}
{
let bulk_state = self
.bulk_insert_state
.read()
.unwrap_or_else(|e| e.into_inner());
if bulk_state.current_memory_usage >= config.memory_threshold {
drop(bulk_state);
self.flush_pending_triples(config)?;
}
}
{
let mut bulk_state = self
.bulk_insert_state
.write()
.unwrap_or_else(|e| e.into_inner());
bulk_state.batch_count += 1;
}
}
Ok(())
}
fn bulk_insert_parallel(
&self,
triples: &[StarTriple],
config: &BulkInsertConfig,
) -> StarResult<()> {
let chunk_size = triples.len() / config.worker_threads;
let mut handles = Vec::new();
for chunk in triples.chunks(chunk_size) {
let chunk = chunk.to_vec();
let store_clone = self.clone();
let config_clone = config.clone();
let handle =
thread::spawn(move || store_clone.bulk_insert_sequential(&chunk, &config_clone));
handles.push(handle);
}
for handle in handles {
handle
.join()
.map_err(|e| StarError::query_error(format!("Thread join error: {e:?}")))??;
}
Ok(())
}
fn flush_pending_triples(&self, config: &BulkInsertConfig) -> StarResult<()> {
let pending_triples = {
let mut bulk_state = self
.bulk_insert_state
.write()
.unwrap_or_else(|e| e.into_inner());
let triples = bulk_state.pending_triples.clone();
bulk_state.pending_triples.clear();
bulk_state.current_memory_usage = 0;
triples
};
if !pending_triples.is_empty() {
debug!("Flushing {} pending triples", pending_triples.len());
{
let mut star_triples = self.star_triples.write().unwrap_or_else(|e| e.into_inner());
let base_index = star_triples.len();
star_triples.extend(pending_triples.clone());
if !config.defer_index_updates {
let mut index = self
.quoted_triple_index
.write()
.unwrap_or_else(|e| e.into_inner());
for (i, triple) in pending_triples.iter().enumerate() {
self.index_quoted_triples(triple, base_index + i, &mut index);
}
}
}
}
Ok(())
}
fn finalize_bulk_insert(&self, config: &BulkInsertConfig) -> StarResult<()> {
self.flush_pending_triples(config)?;
if config.defer_index_updates {
info!("Rebuilding indices after bulk insertion");
self.optimize()?;
}
{
let mut bulk_state = self
.bulk_insert_state
.write()
.unwrap_or_else(|e| e.into_inner());
bulk_state.active = false;
bulk_state.pending_triples.clear();
bulk_state.current_memory_usage = 0;
bulk_state.batch_count = 0;
}
Ok(())
}
fn estimate_triple_memory_size(&self, triple: &StarTriple) -> usize {
let subject_size = match &triple.subject {
StarTerm::NamedNode(nn) => nn.iri.len(),
StarTerm::BlankNode(bn) => bn.id.len(),
StarTerm::Literal(lit) => lit.value.len(),
StarTerm::QuotedTriple(_) => 200, StarTerm::Variable(var) => var.name.len(),
};
let predicate_size = match &triple.predicate {
StarTerm::NamedNode(nn) => nn.iri.len(),
_ => 50, };
let object_size = match &triple.object {
StarTerm::NamedNode(nn) => nn.iri.len(),
StarTerm::BlankNode(bn) => bn.id.len(),
StarTerm::Literal(lit) => lit.value.len(),
StarTerm::QuotedTriple(_) => 200, StarTerm::Variable(var) => var.name.len(),
};
subject_size + predicate_size + object_size + 100 }
pub fn enable_memory_mapping(
&self,
file_path: &str,
enable_compression: bool,
) -> StarResult<()> {
let span = span!(Level::INFO, "enable_memory_mapping");
let _enter = span.enter();
info!("Enabling memory-mapped storage at: {}", file_path);
{
let mut mm_state = self
.memory_mapped
.write()
.unwrap_or_else(|e| e.into_inner());
mm_state.enabled = true;
mm_state.file_path = Some(file_path.to_string());
mm_state.compression_enabled = enable_compression;
mm_state.last_sync = Some(Instant::now());
}
info!(
"Memory-mapped storage enabled with compression: {}",
enable_compression
);
Ok(())
}
pub fn get_triples_cached(&self, pattern: &str) -> Vec<StarTriple> {
let span = span!(Level::DEBUG, "get_triples_cached");
let _enter = span.enter();
if let Some(cached_results) = self.cache.get(pattern) {
debug!("Cache hit for pattern: {}", pattern);
return cached_results;
}
debug!("Cache miss for pattern: {}", pattern);
let results = self.compute_pattern_results(pattern);
self.cache.put(pattern.to_string(), results.clone());
results
}
fn compute_pattern_results(&self, pattern: &str) -> Vec<StarTriple> {
if pattern.contains("quoted") {
self.find_triples_by_nesting_depth(1, None)
} else {
self.triples()
}
}
pub fn get_detailed_statistics(&self) -> DetailedStorageStatistics {
let base_stats = self.statistics();
let cache_stats = self.cache.get_statistics();
let index_stats = {
let index = self
.quoted_triple_index
.read()
.unwrap_or_else(|e| e.into_inner());
index.get_statistics()
};
let bulk_state = self
.bulk_insert_state
.read()
.unwrap_or_else(|e| e.into_inner());
let mm_state = self.memory_mapped.read().unwrap_or_else(|e| e.into_inner());
DetailedStorageStatistics {
basic_stats: base_stats,
cache_stats,
index_stats,
bulk_insert_active: bulk_state.active,
bulk_pending_count: bulk_state.pending_triples.len(),
bulk_memory_usage: bulk_state.current_memory_usage,
memory_mapped_enabled: mm_state.enabled,
memory_mapped_path: mm_state.file_path.clone(),
}
}
pub fn create_connection_pool(max_connections: usize, config: StarConfig) -> ConnectionPool {
ConnectionPool::new(max_connections, config)
}
pub fn compress_storage(&self) -> StarResult<usize> {
let span = span!(Level::INFO, "compress_storage");
let _enter = span.enter();
let triple_count = self.len();
info!("Compressed storage for {} triples", triple_count);
Ok(triple_count * 50)
}
}
#[derive(Debug, Clone)]
pub struct DetailedStorageStatistics {
pub basic_stats: StarStatistics,
pub cache_stats: CacheStatistics,
pub index_stats: IndexStatistics,
pub bulk_insert_active: bool,
pub bulk_pending_count: usize,
pub bulk_memory_usage: usize,
pub memory_mapped_enabled: bool,
pub memory_mapped_path: Option<String>,
}
pub struct StreamingTripleIterator<'a> {
store: &'a StarStore,
chunk_size: usize,
current_chunk: Vec<StarTriple>,
current_index: usize,
total_processed: usize,
}
impl<'a> StreamingTripleIterator<'a> {
fn new(store: &'a StarStore, chunk_size: usize) -> Self {
Self {
store,
chunk_size: chunk_size.max(1),
current_chunk: Vec::new(),
current_index: 0,
total_processed: 0,
}
}
fn load_next_chunk(&mut self) -> bool {
let all_triples = self.store.all_triples();
let start = self.total_processed;
let end = (start + self.chunk_size).min(all_triples.len());
if start >= all_triples.len() {
return false;
}
self.current_chunk.clear();
self.current_chunk
.extend(all_triples.iter().skip(start).take(end - start).cloned());
self.current_index = 0;
!self.current_chunk.is_empty()
}
}
impl<'a> Iterator for StreamingTripleIterator<'a> {
type Item = StarTriple;
fn next(&mut self) -> Option<Self::Item> {
if self.current_index >= self.current_chunk.len() && !self.load_next_chunk() {
return None;
}
let triple = self.current_chunk.get(self.current_index).cloned();
if triple.is_some() {
self.current_index += 1;
self.total_processed += 1;
}
triple
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::model::StarTerm;
#[test]
fn test_store_creation() -> StarResult<()> {
let store = StarStore::new();
assert!(store.is_empty());
assert_eq!(store.len(), 0);
Ok(())
}
#[test]
fn test_basic_operations() -> StarResult<()> {
let store = StarStore::new();
let triple = StarTriple::new(
StarTerm::iri("http://example.org/alice")?,
StarTerm::iri("http://example.org/knows")?,
StarTerm::iri("http://example.org/bob")?,
);
store.insert(&triple)?;
assert_eq!(store.len(), 1);
assert!(store.contains(&triple));
let results = store.query_triples(
Some(&StarTerm::iri("http://example.org/alice")?),
None,
None,
)?;
assert_eq!(results.len(), 1);
assert!(store.remove(&triple)?);
assert!(store.is_empty());
Ok(())
}
#[test]
fn test_quoted_triple_operations() -> StarResult<()> {
let store = StarStore::new();
let inner = StarTriple::new(
StarTerm::iri("http://example.org/alice")?,
StarTerm::iri("http://example.org/age")?,
StarTerm::literal("25")?,
);
let outer = StarTriple::new(
StarTerm::quoted_triple(inner.clone()),
StarTerm::iri("http://example.org/certainty")?,
StarTerm::literal("0.9")?,
);
store.insert(&outer)?;
assert_eq!(store.len(), 1);
let containing = store.find_triples_containing_quoted(&inner);
assert_eq!(containing.len(), 1);
assert_eq!(containing[0], outer);
Ok(())
}
#[test]
fn test_store_statistics() -> StarResult<()> {
let store = StarStore::new();
let regular = StarTriple::new(
StarTerm::iri("http://example.org/s")?,
StarTerm::iri("http://example.org/p")?,
StarTerm::iri("http://example.org/o")?,
);
let quoted = StarTriple::new(
StarTerm::quoted_triple(regular.clone()),
StarTerm::iri("http://example.org/certainty")?,
StarTerm::literal("high")?,
);
store.insert(®ular)?;
store.insert("ed)?;
let stats = store.statistics();
assert_eq!(stats.quoted_triples_count, 1);
assert_eq!(stats.max_nesting_encountered, 1);
Ok(())
}
#[test]
fn test_btree_indexing_performance() -> StarResult<()> {
let store = StarStore::new();
let base_triple = StarTriple::new(
StarTerm::iri("http://example.org/alice")?,
StarTerm::iri("http://example.org/age")?,
StarTerm::literal("25")?,
);
let quoted1 = StarTriple::new(
StarTerm::quoted_triple(base_triple.clone()),
StarTerm::iri("http://example.org/certainty")?,
StarTerm::literal("0.9")?,
);
let quoted2 = StarTriple::new(
StarTerm::iri("http://example.org/bob")?,
StarTerm::iri("http://example.org/believes")?,
StarTerm::quoted_triple(base_triple.clone()),
);
store.insert("ed1)?;
store.insert("ed2)?;
let results = store.find_triples_by_quoted_pattern(
Some(&StarTerm::iri("http://example.org/alice")?),
None,
None,
);
assert_eq!(results.len(), 2);
let shallow_results = store.find_triples_by_nesting_depth(0, Some(0));
assert_eq!(shallow_results.len(), 0);
let depth_1_results = store.find_triples_by_nesting_depth(1, Some(1));
assert_eq!(depth_1_results.len(), 2); Ok(())
}
#[test]
fn test_graph_import_export() -> StarResult<()> {
let store = StarStore::new();
let mut graph = StarGraph::new();
let triple = StarTriple::new(
StarTerm::iri("http://example.org/s")?,
StarTerm::iri("http://example.org/p")?,
StarTerm::iri("http://example.org/o")?,
);
graph.insert(triple.clone())?;
store.from_graph(&graph)?;
assert_eq!(store.len(), 1);
assert!(store.contains(&triple));
let exported = store.to_graph();
assert_eq!(exported.len(), 1);
assert!(exported.contains(&triple));
Ok(())
}
#[test]
fn test_streaming_iterator() -> StarResult<()> {
let store = StarStore::new();
for i in 0..100 {
let triple = StarTriple::new(
StarTerm::iri(&format!("http://example.org/s{i}"))?,
StarTerm::iri("http://example.org/p")?,
StarTerm::iri(&format!("http://example.org/o{i}"))?,
);
store.insert(&triple)?;
}
let chunk_sizes = vec![1, 10, 50, 100, 200];
for chunk_size in chunk_sizes {
let mut count = 0;
for _triple in store.streaming_iter(chunk_size) {
count += 1;
}
assert_eq!(
count, 100,
"Streaming iterator with chunk size {chunk_size} should return all triples"
);
}
let regular_triples: Vec<_> = store.iter().collect();
let streaming_triples: Vec<_> = store.streaming_iter(25).collect();
assert_eq!(regular_triples.len(), streaming_triples.len());
for triple in ®ular_triples {
assert!(streaming_triples.contains(triple));
}
Ok(())
}
}