use std::sync::{Arc, RwLock};
use std::time::Instant;
use oxirs_core::rdf_store::{ConcreteStore as CoreStore, Store};
use tracing::{debug, info, span, Level};
use crate::model::{StarTerm, StarTriple};
use crate::store::{cache_mod, conversion, index};
use crate::{StarConfig, StarError, StarResult, StarStatistics};
use cache_mod::{CacheConfig, StarCache};
use index::QuotedTripleIndex;
#[derive(Clone)]
pub struct StarStore {
pub(crate) core_store: Arc<RwLock<CoreStore>>,
pub(crate) star_triples: Arc<RwLock<Vec<StarTriple>>>,
pub(crate) quoted_triple_index: Arc<RwLock<QuotedTripleIndex>>,
pub(crate) config: StarConfig,
pub(crate) statistics: Arc<RwLock<StarStatistics>>,
pub(crate) cache: Arc<StarCache>,
pub(crate) bulk_insert_state: Arc<RwLock<BulkInsertState>>,
pub(crate) memory_mapped: Arc<RwLock<MemoryMappedState>>,
}
#[derive(Debug, Default)]
pub(crate) struct BulkInsertState {
pub(crate) active: bool,
pub(crate) pending_triples: Vec<StarTriple>,
pub(crate) current_memory_usage: usize,
pub(crate) batch_count: usize,
}
#[derive(Debug, Default)]
pub(crate) struct MemoryMappedState {
pub(crate) enabled: bool,
pub(crate) file_path: Option<String>,
pub(crate) compression_enabled: bool,
pub(crate) last_sync: Option<Instant>,
}
impl StarStore {
pub fn new() -> Self {
Self::with_config(StarConfig::default())
}
pub fn with_config(config: StarConfig) -> Self {
let span = span!(Level::INFO, "new_star_store");
let _enter = span.enter();
info!("Creating new RDF-star store with optimizations");
debug!("Configuration: {:?}", config);
Self {
core_store: Arc::new(RwLock::new(
CoreStore::new().expect("Failed to create core store"),
)),
star_triples: Arc::new(RwLock::new(Vec::new())),
quoted_triple_index: Arc::new(RwLock::new(QuotedTripleIndex::new())),
config: config.clone(),
statistics: Arc::new(RwLock::new(StarStatistics::default())),
cache: Arc::new(StarCache::new(CacheConfig::default())),
bulk_insert_state: Arc::new(RwLock::new(BulkInsertState::default())),
memory_mapped: Arc::new(RwLock::new(MemoryMappedState::default())),
}
}
pub fn config(&self) -> &StarConfig {
&self.config
}
pub fn insert(&self, triple: &StarTriple) -> StarResult<()> {
let span = span!(Level::DEBUG, "insert_triple");
let _enter = span.enter();
let start_time = Instant::now();
triple.validate()?;
crate::validate_nesting_depth(&triple.subject, self.config.max_nesting_depth)?;
crate::validate_nesting_depth(&triple.predicate, self.config.max_nesting_depth)?;
crate::validate_nesting_depth(&triple.object, self.config.max_nesting_depth)?;
eprintln!(
"DEBUG INSERT: Triple contains quoted triples: {}",
triple.contains_quoted_triples()
);
if triple.contains_quoted_triples() {
eprintln!("DEBUG INSERT: Inserting as star triple");
self.insert_star_triple(triple)?;
} else {
eprintln!("DEBUG INSERT: Inserting as regular triple");
self.insert_regular_triple(triple)?;
}
{
let mut stats = self.statistics.write().unwrap_or_else(|e| e.into_inner());
stats.processing_time_us += start_time.elapsed().as_micros() as u64;
if triple.contains_quoted_triples() {
stats.quoted_triples_count += 1;
stats.max_nesting_encountered =
stats.max_nesting_encountered.max(triple.nesting_depth());
}
}
debug!("Inserted triple: {}", triple);
Ok(())
}
pub(crate) fn insert_regular_triple(&self, triple: &StarTriple) -> StarResult<()> {
eprintln!("DEBUG: Inserting regular triple into core store");
let core_triple = self.convert_to_core_triple(triple)?;
eprintln!("DEBUG: Converted to core triple successfully");
let core_quad = oxirs_core::model::Quad::from_triple(core_triple);
eprintln!("DEBUG: Created core quad for insertion: {core_quad:?}");
let core_store = self.core_store.write().unwrap_or_else(|e| e.into_inner());
let result = CoreStore::insert_quad(&core_store, core_quad).map_err(StarError::CoreError);
eprintln!("DEBUG: Core store insert result: {result:?}");
result?;
eprintln!("DEBUG: Successfully inserted regular triple");
Ok(())
}
pub(crate) fn convert_to_core_triple(
&self,
triple: &StarTriple,
) -> StarResult<oxirs_core::model::Triple> {
let subject = conversion::star_term_to_subject(&triple.subject)?;
let predicate = conversion::star_term_to_predicate(&triple.predicate)?;
let object = conversion::star_term_to_object(&triple.object)?;
Ok(oxirs_core::model::Triple::new(subject, predicate, object))
}
pub(crate) fn insert_star_triple(&self, triple: &StarTriple) -> StarResult<()> {
let mut star_triples = self.star_triples.write().unwrap_or_else(|e| e.into_inner());
let mut index = self
.quoted_triple_index
.write()
.unwrap_or_else(|e| e.into_inner());
let triple_index = star_triples.len();
star_triples.push(triple.clone());
self.index_quoted_triples(triple, triple_index, &mut index);
debug!(
"Inserted star triple with {} quoted triples",
self.count_quoted_triples_in_triple(triple)
);
Ok(())
}
pub fn remove(&self, triple: &StarTriple) -> StarResult<bool> {
let span = span!(Level::DEBUG, "remove_triple");
let _enter = span.enter();
eprintln!("DEBUG: Attempting to remove triple: {triple}");
eprintln!(
"DEBUG: Triple contains quoted triples: {}",
triple.contains_quoted_triples()
);
if triple.contains_quoted_triples() {
let mut star_triples = self.star_triples.write().unwrap_or_else(|e| e.into_inner());
if let Some(pos) = star_triples.iter().position(|t| t == triple) {
star_triples.remove(pos);
let mut index = self
.quoted_triple_index
.write()
.unwrap_or_else(|e| e.into_inner());
for (_, indices) in index.signature_to_indices.iter_mut() {
Self::update_indices_after_removal(indices, pos);
}
for (_, indices) in index.subject_index.iter_mut() {
Self::update_indices_after_removal(indices, pos);
}
for (_, indices) in index.predicate_index.iter_mut() {
Self::update_indices_after_removal(indices, pos);
}
for (_, indices) in index.object_index.iter_mut() {
Self::update_indices_after_removal(indices, pos);
}
for (_, indices) in index.nesting_depth_index.iter_mut() {
Self::update_indices_after_removal(indices, pos);
}
debug!("Removed star triple: {}", triple);
return Ok(true);
}
} else {
eprintln!("DEBUG: Attempting to remove regular triple from core store");
let core_store = self.core_store.write().unwrap_or_else(|e| e.into_inner());
if let Ok(core_triple) = self.convert_to_core_triple(triple) {
eprintln!("DEBUG: Successfully converted to core triple");
let core_quad = oxirs_core::model::Quad::from_triple(core_triple);
eprintln!("DEBUG: Created core quad: {core_quad:?}");
match CoreStore::remove_quad(&core_store, &core_quad) {
Ok(removed) => {
eprintln!("DEBUG: Core store remove_quad returned: {removed}");
if removed {
eprintln!("DEBUG: Removed regular triple: {triple}");
return Ok(true);
} else {
eprintln!(
"DEBUG: Core store remove_quad returned false - triple not found"
);
}
}
Err(e) => {
eprintln!("DEBUG: Core store remove_quad failed with error: {e:?}");
}
}
} else {
eprintln!("DEBUG: Failed to convert triple to core triple");
}
}
Ok(false)
}
pub fn contains(&self, triple: &StarTriple) -> bool {
let star_triples = self.star_triples.read().unwrap_or_else(|e| e.into_inner());
if star_triples.contains(triple) {
return true;
}
if !triple.contains_quoted_triples() {
let core_store = self.core_store.read().unwrap_or_else(|e| e.into_inner());
if let Ok(core_triple) = self.convert_to_core_triple(triple) {
let core_quad = oxirs_core::model::Quad::from_triple(core_triple);
if let Ok(quads) = core_store.find_quads(
Some(core_quad.subject()),
Some(core_quad.predicate()),
Some(core_quad.object()),
Some(core_quad.graph_name()),
) {
return !quads.is_empty();
}
}
}
false
}
pub fn len(&self) -> usize {
let star_triples = self.star_triples.read().unwrap_or_else(|e| e.into_inner());
let core_store = self.core_store.read().unwrap_or_else(|e| e.into_inner());
let regular_count = core_store.len().unwrap_or(0);
let star_count = star_triples.len();
regular_count + star_count
}
pub fn is_empty(&self) -> bool {
let star_triples = self.star_triples.read().unwrap_or_else(|e| e.into_inner());
let core_store = self.core_store.read().unwrap_or_else(|e| e.into_inner());
star_triples.is_empty() && core_store.is_empty().unwrap_or(true)
}
pub fn clear(&self) -> StarResult<()> {
let span = span!(Level::INFO, "clear_store");
let _enter = span.enter();
{
let mut star_triples = self.star_triples.write().unwrap_or_else(|e| e.into_inner());
star_triples.clear();
}
{
let mut core_store = self.core_store.write().unwrap_or_else(|e| e.into_inner());
*core_store = CoreStore::new().map_err(StarError::CoreError)?;
}
{
let mut index = self
.quoted_triple_index
.write()
.unwrap_or_else(|e| e.into_inner());
index.clear();
}
{
let mut stats = self.statistics.write().unwrap_or_else(|e| e.into_inner());
*stats = StarStatistics::default();
}
info!("Cleared all triples from store");
Ok(())
}
pub fn statistics(&self) -> StarStatistics {
let stats = self.statistics.read().unwrap_or_else(|e| e.into_inner());
stats.clone()
}
pub(crate) fn triple_matches(
&self,
triple: &StarTriple,
subject: Option<&StarTerm>,
predicate: Option<&StarTerm>,
object: Option<&StarTerm>,
) -> bool {
if let Some(s) = subject {
if &triple.subject != s {
return false;
}
}
if let Some(p) = predicate {
if &triple.predicate != p {
return false;
}
}
if let Some(o) = object {
if &triple.object != o {
return false;
}
}
true
}
pub fn update_config(&mut self, config: StarConfig) -> StarResult<()> {
crate::init_star_system(config.clone())?;
self.config = config;
Ok(())
}
}
impl Default for StarStore {
fn default() -> Self {
Self::new()
}
}