use super::sink::TripleSink;
use super::term::Term;
use super::triple::{Triple, TriplePattern};
use grafeo_common::types::TransactionId;
use grafeo_common::utils::hash::FxHashSet;
use hashbrown::HashMap;
use parking_lot::RwLock;
use std::sync::Arc;
#[derive(Debug, Clone)]
enum PendingOp {
Insert(Triple),
Delete(Triple),
}
#[derive(Debug, Default)]
struct TransactionBuffer {
buffers: HashMap<TransactionId, Vec<PendingOp>>,
}
#[derive(Debug, Clone)]
pub struct RdfStoreConfig {
pub initial_capacity: usize,
pub index_objects: bool,
}
impl Default for RdfStoreConfig {
fn default() -> Self {
Self {
initial_capacity: 1024,
index_objects: true,
}
}
}
pub struct RdfStore {
config: RdfStoreConfig,
triples: RwLock<FxHashSet<Arc<Triple>>>,
subject_index: RwLock<hashbrown::HashMap<Term, Vec<Arc<Triple>>, foldhash::fast::RandomState>>,
predicate_index:
RwLock<hashbrown::HashMap<Term, Vec<Arc<Triple>>, foldhash::fast::RandomState>>,
object_index:
RwLock<Option<hashbrown::HashMap<Term, Vec<Arc<Triple>>, foldhash::fast::RandomState>>>,
sp_index:
RwLock<hashbrown::HashMap<(Term, Term), Vec<Arc<Triple>>, foldhash::fast::RandomState>>,
po_index:
RwLock<hashbrown::HashMap<(Term, Term), Vec<Arc<Triple>>, foldhash::fast::RandomState>>,
os_index:
RwLock<hashbrown::HashMap<(Term, Term), Vec<Arc<Triple>>, foldhash::fast::RandomState>>,
tx_buffer: RwLock<TransactionBuffer>,
named_graphs: RwLock<HashMap<String, Arc<RdfStore>>>,
statistics_cache: RwLock<Option<Arc<crate::statistics::RdfStatistics>>>,
dictionary_cache: RwLock<Option<Arc<super::dictionary::TermDictionary>>>,
#[cfg(feature = "ring-index")]
ring: RwLock<Option<Arc<crate::index::ring::TripleRing>>>,
#[cfg(feature = "ring-index")]
ring_stale: std::sync::atomic::AtomicBool,
}
impl RdfStore {
pub fn new() -> Self {
Self::with_config(RdfStoreConfig::default())
}
pub fn with_config(config: RdfStoreConfig) -> Self {
let object_index = if config.index_objects {
Some(hashbrown::HashMap::with_capacity_and_hasher(
config.initial_capacity,
foldhash::fast::RandomState::default(),
))
} else {
None
};
Self {
triples: RwLock::new(FxHashSet::default()),
subject_index: RwLock::new(hashbrown::HashMap::with_capacity_and_hasher(
config.initial_capacity,
foldhash::fast::RandomState::default(),
)),
predicate_index: RwLock::new(hashbrown::HashMap::with_capacity_and_hasher(
config.initial_capacity,
foldhash::fast::RandomState::default(),
)),
object_index: RwLock::new(object_index),
sp_index: RwLock::new(hashbrown::HashMap::with_capacity_and_hasher(
config.initial_capacity,
foldhash::fast::RandomState::default(),
)),
po_index: RwLock::new(hashbrown::HashMap::with_capacity_and_hasher(
config.initial_capacity,
foldhash::fast::RandomState::default(),
)),
os_index: RwLock::new(hashbrown::HashMap::with_capacity_and_hasher(
config.initial_capacity,
foldhash::fast::RandomState::default(),
)),
tx_buffer: RwLock::new(TransactionBuffer::default()),
named_graphs: RwLock::new(HashMap::new()),
statistics_cache: RwLock::new(None),
dictionary_cache: RwLock::new(None),
#[cfg(feature = "ring-index")]
ring: RwLock::new(None),
#[cfg(feature = "ring-index")]
ring_stale: std::sync::atomic::AtomicBool::new(false),
config,
}
}
pub fn insert(&self, triple: Triple) -> bool {
let triple = Arc::new(triple);
{
let triples = self.triples.read();
if triples.contains(&triple) {
return false;
}
}
{
let mut triples = self.triples.write();
if !triples.insert(Arc::clone(&triple)) {
return false;
}
}
{
let mut subject_index = self.subject_index.write();
subject_index
.entry(triple.subject().clone())
.or_default()
.push(Arc::clone(&triple));
}
{
let mut predicate_index = self.predicate_index.write();
predicate_index
.entry(triple.predicate().clone())
.or_default()
.push(Arc::clone(&triple));
}
if self.config.index_objects {
let mut object_index = self.object_index.write();
if let Some(ref mut index) = *object_index {
index
.entry(triple.object().clone())
.or_default()
.push(Arc::clone(&triple));
}
}
{
let mut sp = self.sp_index.write();
sp.entry((triple.subject().clone(), triple.predicate().clone()))
.or_default()
.push(Arc::clone(&triple));
}
{
let mut po = self.po_index.write();
po.entry((triple.predicate().clone(), triple.object().clone()))
.or_default()
.push(Arc::clone(&triple));
}
{
let mut os = self.os_index.write();
os.entry((triple.object().clone(), triple.subject().clone()))
.or_default()
.push(triple);
}
self.invalidate_statistics_cache();
true
}
pub fn batch_insert(&self, triples: impl IntoIterator<Item = Triple>) -> usize {
let mut new_triples = Vec::new();
{
let mut primary = self.triples.write();
for triple in triples {
let arc = Arc::new(triple);
if primary.insert(Arc::clone(&arc)) {
new_triples.push(arc);
}
}
}
if new_triples.is_empty() {
return 0;
}
let count = new_triples.len();
{
let mut subject_index = self.subject_index.write();
for triple in &new_triples {
subject_index
.entry(triple.subject().clone())
.or_default()
.push(Arc::clone(triple));
}
}
{
let mut predicate_index = self.predicate_index.write();
for triple in &new_triples {
predicate_index
.entry(triple.predicate().clone())
.or_default()
.push(Arc::clone(triple));
}
}
if self.config.index_objects {
let mut object_index = self.object_index.write();
if let Some(ref mut index) = *object_index {
for triple in &new_triples {
index
.entry(triple.object().clone())
.or_default()
.push(Arc::clone(triple));
}
}
}
{
let mut sp = self.sp_index.write();
for triple in &new_triples {
sp.entry((triple.subject().clone(), triple.predicate().clone()))
.or_default()
.push(Arc::clone(triple));
}
}
{
let mut po = self.po_index.write();
for triple in &new_triples {
po.entry((triple.predicate().clone(), triple.object().clone()))
.or_default()
.push(Arc::clone(triple));
}
}
{
let mut os = self.os_index.write();
for triple in new_triples {
os.entry((triple.object().clone(), triple.subject().clone()))
.or_default()
.push(triple);
}
}
if count > 0 {
self.invalidate_statistics_cache();
}
count
}
pub fn remove(&self, triple: &Triple) -> bool {
let removed = {
let mut triples = self.triples.write();
triples.remove(triple)
};
if !removed {
return false;
}
{
let mut subject_index = self.subject_index.write();
if let Some(vec) = subject_index.get_mut(triple.subject()) {
vec.retain(|t| t.as_ref() != triple);
if vec.is_empty() {
subject_index.remove(triple.subject());
}
}
}
{
let mut predicate_index = self.predicate_index.write();
if let Some(vec) = predicate_index.get_mut(triple.predicate()) {
vec.retain(|t| t.as_ref() != triple);
if vec.is_empty() {
predicate_index.remove(triple.predicate());
}
}
}
if self.config.index_objects {
let mut object_index = self.object_index.write();
if let Some(ref mut index) = *object_index
&& let Some(vec) = index.get_mut(triple.object())
{
vec.retain(|t| t.as_ref() != triple);
if vec.is_empty() {
index.remove(triple.object());
}
}
}
{
let mut sp = self.sp_index.write();
let key = (triple.subject().clone(), triple.predicate().clone());
if let Some(vec) = sp.get_mut(&key) {
vec.retain(|t| t.as_ref() != triple);
if vec.is_empty() {
sp.remove(&key);
}
}
}
{
let mut po = self.po_index.write();
let key = (triple.predicate().clone(), triple.object().clone());
if let Some(vec) = po.get_mut(&key) {
vec.retain(|t| t.as_ref() != triple);
if vec.is_empty() {
po.remove(&key);
}
}
}
{
let mut os = self.os_index.write();
let key = (triple.object().clone(), triple.subject().clone());
if let Some(vec) = os.get_mut(&key) {
vec.retain(|t| t.as_ref() != triple);
if vec.is_empty() {
os.remove(&key);
}
}
}
self.invalidate_statistics_cache();
true
}
#[must_use]
pub fn len(&self) -> usize {
self.triples.read().len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.triples.read().is_empty()
}
#[must_use]
pub fn contains(&self, triple: &Triple) -> bool {
self.triples.read().contains(triple)
}
pub fn triples(&self) -> Vec<Arc<Triple>> {
self.triples.read().iter().cloned().collect()
}
pub fn find(&self, pattern: &TriplePattern) -> Vec<Arc<Triple>> {
match (&pattern.subject, &pattern.predicate, &pattern.object) {
(Some(s), Some(p), Some(o)) => {
let index = self.sp_index.read();
if let Some(triples) = index.get(&(s.clone(), p.clone())) {
triples
.iter()
.filter(|t| t.object() == o)
.cloned()
.collect()
} else {
Vec::new()
}
}
(Some(s), Some(p), None) => {
let index = self.sp_index.read();
index
.get(&(s.clone(), p.clone()))
.cloned()
.unwrap_or_default()
}
(Some(s), None, Some(o)) => {
let index = self.os_index.read();
index
.get(&(o.clone(), s.clone()))
.cloned()
.unwrap_or_default()
}
(None, Some(p), Some(o)) => {
let index = self.po_index.read();
index
.get(&(p.clone(), o.clone()))
.cloned()
.unwrap_or_default()
}
(Some(s), None, None) => {
let index = self.subject_index.read();
index.get(s).cloned().unwrap_or_default()
}
(None, Some(p), None) => {
let index = self.predicate_index.read();
index.get(p).cloned().unwrap_or_default()
}
(None, None, Some(o)) if self.config.index_objects => {
let index = self.object_index.read();
if let Some(ref idx) = *index {
idx.get(o).cloned().unwrap_or_default()
} else {
Vec::new()
}
}
_ => self
.triples
.read()
.iter()
.filter(|t| pattern.matches(t))
.cloned()
.collect(),
}
}
pub fn triples_with_subject(&self, subject: &Term) -> Vec<Arc<Triple>> {
let index = self.subject_index.read();
index.get(subject).cloned().unwrap_or_default()
}
pub fn triples_with_predicate(&self, predicate: &Term) -> Vec<Arc<Triple>> {
let index = self.predicate_index.read();
index.get(predicate).cloned().unwrap_or_default()
}
pub fn triples_with_object(&self, object: &Term) -> Vec<Arc<Triple>> {
let index = self.object_index.read();
if let Some(ref idx) = *index {
idx.get(object).cloned().unwrap_or_default()
} else {
self.triples
.read()
.iter()
.filter(|t| t.object() == object)
.cloned()
.collect()
}
}
pub fn subjects(&self) -> Vec<Term> {
self.subject_index.read().keys().cloned().collect()
}
pub fn predicates(&self) -> Vec<Term> {
self.predicate_index.read().keys().cloned().collect()
}
pub fn objects(&self) -> Vec<Term> {
if self.config.index_objects {
let index = self.object_index.read();
if let Some(ref idx) = *index {
return idx.keys().cloned().collect();
}
}
let triples = self.triples.read();
let mut objects = FxHashSet::default();
for triple in triples.iter() {
objects.insert(triple.object().clone());
}
objects.into_iter().collect()
}
pub fn clear(&self) {
self.triples.write().clear();
self.subject_index.write().clear();
self.predicate_index.write().clear();
if let Some(ref mut idx) = *self.object_index.write() {
idx.clear();
}
self.sp_index.write().clear();
self.po_index.write().clear();
self.os_index.write().clear();
self.invalidate_statistics_cache();
}
#[must_use]
pub fn stats(&self) -> RdfStoreStats {
RdfStoreStats {
triple_count: self.len(),
subject_count: self.subject_index.read().len(),
predicate_count: self.predicate_index.read().len(),
object_count: if self.config.index_objects {
self.object_index.read().as_ref().map_or(0, |i| i.len())
} else {
0
},
graph_count: self.named_graphs.read().len(),
}
}
#[must_use]
pub fn collect_statistics(&self) -> crate::statistics::RdfStatistics {
let mut collector = crate::statistics::RdfStatisticsCollector::new();
let triples = self.triples.read();
for triple in triples.iter() {
collector.record_triple(
&triple.subject().to_string(),
&triple.predicate().to_string(),
&triple.object().to_string(),
);
}
collector.build()
}
#[must_use]
pub fn get_or_collect_statistics(&self) -> Arc<crate::statistics::RdfStatistics> {
if let Some(cached) = self.statistics_cache.read().as_ref() {
return Arc::clone(cached);
}
let stats = Arc::new(self.collect_statistics());
*self.statistics_cache.write() = Some(Arc::clone(&stats));
stats
}
fn invalidate_statistics_cache(&self) {
*self.statistics_cache.write() = None;
*self.dictionary_cache.write() = None;
#[cfg(feature = "ring-index")]
{
self.ring_stale
.store(true, std::sync::atomic::Ordering::Relaxed);
*self.ring.write() = None;
}
}
#[must_use]
pub fn get_or_build_dictionary(&self) -> Arc<super::dictionary::TermDictionary> {
{
let cache = self.dictionary_cache.read();
if let Some(dict) = cache.as_ref() {
return Arc::clone(dict);
}
}
let triples = self.triples.read();
let mut dict =
super::dictionary::TermDictionary::with_capacity(triples.len().saturating_mul(3));
for triple in triples.iter() {
dict.get_or_insert(triple.subject());
dict.get_or_insert(triple.predicate());
dict.get_or_insert(triple.object());
}
let dict = Arc::new(dict);
*self.dictionary_cache.write() = Some(Arc::clone(&dict));
dict
}
#[must_use]
pub fn term_dictionary(&self) -> Option<Arc<super::dictionary::TermDictionary>> {
self.dictionary_cache.read().clone()
}
#[cfg(feature = "ring-index")]
#[must_use]
pub fn ring(&self) -> Option<Arc<crate::index::ring::TripleRing>> {
if self.ring_stale.load(std::sync::atomic::Ordering::Relaxed) {
return None;
}
self.ring.read().clone()
}
#[cfg(feature = "ring-index")]
pub fn rebuild_ring(&self) {
let triples = self.triples.read();
if triples.is_empty() {
*self.ring.write() = None;
return;
}
let ring = crate::index::ring::TripleRing::from_triples(
triples.iter().map(|t| t.as_ref().clone()),
);
*self.ring.write() = Some(Arc::new(ring));
self.ring_stale
.store(false, std::sync::atomic::Ordering::Relaxed);
}
#[cfg(feature = "ring-index")]
pub fn set_ring(&self, ring: crate::index::ring::TripleRing) {
*self.ring.write() = Some(Arc::new(ring));
self.ring_stale
.store(false, std::sync::atomic::Ordering::Relaxed);
}
pub fn bulk_load(&self, triples: impl IntoIterator<Item = Triple>) -> BulkLoadResult {
let arcs: Vec<Arc<Triple>> = triples.into_iter().map(Arc::new).collect();
let count = arcs.len();
if count == 0 {
self.clear();
return BulkLoadResult {
triple_count: 0,
statistics: crate::statistics::RdfStatistics::new(),
};
}
let hasher = || foldhash::fast::RandomState::default();
let mut subject_idx = hashbrown::HashMap::with_capacity_and_hasher(count / 4, hasher());
let mut predicate_idx = hashbrown::HashMap::with_capacity_and_hasher(count / 8, hasher());
let mut object_idx_map = hashbrown::HashMap::with_capacity_and_hasher(count / 4, hasher());
let mut sp_idx = hashbrown::HashMap::with_capacity_and_hasher(count / 2, hasher());
let mut po_idx = hashbrown::HashMap::with_capacity_and_hasher(count / 2, hasher());
let mut os_idx = hashbrown::HashMap::with_capacity_and_hasher(count / 2, hasher());
let mut stats_collector = crate::statistics::RdfStatisticsCollector::new();
for triple in &arcs {
subject_idx
.entry(triple.subject().clone())
.or_insert_with(Vec::new)
.push(Arc::clone(triple));
predicate_idx
.entry(triple.predicate().clone())
.or_insert_with(Vec::new)
.push(Arc::clone(triple));
if self.config.index_objects {
object_idx_map
.entry(triple.object().clone())
.or_insert_with(Vec::new)
.push(Arc::clone(triple));
}
sp_idx
.entry((triple.subject().clone(), triple.predicate().clone()))
.or_insert_with(Vec::new)
.push(Arc::clone(triple));
po_idx
.entry((triple.predicate().clone(), triple.object().clone()))
.or_insert_with(Vec::new)
.push(Arc::clone(triple));
os_idx
.entry((triple.object().clone(), triple.subject().clone()))
.or_insert_with(Vec::new)
.push(Arc::clone(triple));
stats_collector.record_triple(
&triple.subject().to_string(),
&triple.predicate().to_string(),
&triple.object().to_string(),
);
}
let primary: FxHashSet<Arc<Triple>> = arcs.into_iter().collect();
*self.triples.write() = primary;
*self.subject_index.write() = subject_idx;
*self.predicate_index.write() = predicate_idx;
*self.object_index.write() = if self.config.index_objects {
Some(object_idx_map)
} else {
None
};
*self.sp_index.write() = sp_idx;
*self.po_index.write() = po_idx;
*self.os_index.write() = os_idx;
let stats = stats_collector.build();
*self.statistics_cache.write() = Some(Arc::new(stats.clone()));
*self.dictionary_cache.write() = None;
#[cfg(feature = "ring-index")]
{
let ring = crate::index::ring::TripleRing::from_triples(
self.triples.read().iter().map(|t| t.as_ref().clone()),
);
*self.ring.write() = Some(Arc::new(ring));
self.ring_stale
.store(false, std::sync::atomic::Ordering::Relaxed);
}
BulkLoadResult {
triple_count: count,
statistics: stats,
}
}
pub fn load_ntriples(
&self,
reader: impl std::io::BufRead,
) -> Result<BulkLoadResult, NTriplesError> {
let mut triples = Vec::new();
for (line_no, line) in reader.lines().enumerate() {
let line = line.map_err(NTriplesError::Io)?;
let trimmed = line.trim();
if trimmed.is_empty() || trimmed.starts_with('#') {
continue;
}
let triple = parse_ntriples_line(trimmed).ok_or_else(|| NTriplesError::Parse {
line: line_no + 1,
content: line.clone(),
})?;
triples.push(triple);
}
Ok(self.bulk_load(triples))
}
pub fn load_turtle(&self, input: &str) -> Result<BulkLoadResult, super::turtle::TurtleError> {
let triples = super::turtle::TurtleParser::new().parse(input)?;
Ok(self.bulk_load(triples))
}
pub fn load_turtle_streaming(
&self,
input: &str,
batch_size: usize,
) -> Result<usize, super::turtle::TurtleError> {
let mut sink = super::sink::BatchInsertSink::new(self, batch_size);
let mut parser = super::turtle::TurtleParser::new();
parser.parse_into(input, &mut sink)?;
Ok(sink.total_inserted())
}
pub fn load_turtle_reader(
&self,
reader: impl std::io::Read,
batch_size: usize,
) -> Result<usize, NTriplesError> {
let mut input = String::new();
std::io::Read::read_to_string(&mut { reader }, &mut input).map_err(NTriplesError::Io)?;
self.load_turtle_streaming(&input, batch_size)
.map_err(|e| NTriplesError::Parse {
line: e.line,
content: e.message,
})
}
pub fn load_ntriples_streaming(
&self,
reader: impl std::io::BufRead,
batch_size: usize,
) -> Result<usize, NTriplesError> {
let mut sink = super::sink::BatchInsertSink::new(self, batch_size);
for (line_no, line) in reader.lines().enumerate() {
let line = line.map_err(NTriplesError::Io)?;
let trimmed = line.trim();
if trimmed.is_empty() || trimmed.starts_with('#') {
continue;
}
let triple = parse_ntriples_line(trimmed).ok_or_else(|| NTriplesError::Parse {
line: line_no + 1,
content: line.clone(),
})?;
sink.emit(triple).map_err(|e| NTriplesError::Parse {
line: line_no + 1,
content: e,
})?;
}
sink.finish().map_err(|e| NTriplesError::Parse {
line: 0,
content: e,
})?;
Ok(sink.total_inserted())
}
pub fn to_turtle(&self) -> std::io::Result<String> {
super::turtle::TurtleSerializer::new().to_string(&self.triples())
}
pub fn to_nquads(&self) -> std::io::Result<String> {
super::nquads::to_nquads_string(self)
}
#[must_use]
pub fn graph(&self, name: &str) -> Option<Arc<RdfStore>> {
self.named_graphs.read().get(name).cloned()
}
pub fn graph_or_create(&self, name: &str) -> Arc<RdfStore> {
{
let graphs = self.named_graphs.read();
if let Some(g) = graphs.get(name) {
return Arc::clone(g);
}
}
let mut graphs = self.named_graphs.write();
Arc::clone(
graphs
.entry(name.to_string())
.or_insert_with(|| Arc::new(RdfStore::with_config(self.config.clone()))),
)
}
pub fn create_graph(&self, name: &str) -> bool {
let mut graphs = self.named_graphs.write();
if graphs.contains_key(name) {
return false;
}
graphs.insert(
name.to_string(),
Arc::new(RdfStore::with_config(self.config.clone())),
);
true
}
pub fn drop_graph(&self, name: &str) -> bool {
self.named_graphs.write().remove(name).is_some()
}
#[must_use]
pub fn graph_names(&self) -> Vec<String> {
self.named_graphs.read().keys().cloned().collect()
}
#[must_use]
pub fn graph_count(&self) -> usize {
self.named_graphs.read().len()
}
pub fn clear_graph(&self, name: Option<&str>) {
match name {
None => self.clear(),
Some(n) => {
if let Some(g) = self.named_graphs.read().get(n) {
g.clear();
}
}
}
}
pub fn clear_all_named(&self) {
self.named_graphs.write().clear();
}
pub fn copy_graph(&self, source: Option<&str>, dest: Option<&str>) {
let triples = match source {
None => self.triples(),
Some(n) => self.graph(n).map(|g| g.triples()).unwrap_or_default(),
};
let dest_store: Arc<RdfStore> = match dest {
None => {
self.clear();
for t in triples {
self.insert((*t).clone());
}
return;
}
Some(n) => self.graph_or_create(n),
};
dest_store.clear();
for t in triples {
dest_store.insert((*t).clone());
}
}
pub fn move_graph(&self, source: Option<&str>, dest: Option<&str>) {
self.copy_graph(source, dest);
match source {
None => self.clear(),
Some(n) => {
self.drop_graph(n);
}
}
}
pub fn add_graph(&self, source: Option<&str>, dest: Option<&str>) {
let triples = match source {
None => self.triples(),
Some(n) => self.graph(n).map(|g| g.triples()).unwrap_or_default(),
};
match dest {
None => {
for t in triples {
self.insert((*t).clone());
}
}
Some(n) => {
let dest_store = self.graph_or_create(n);
for t in triples {
dest_store.insert((*t).clone());
}
}
}
}
pub fn find_in_graphs(
&self,
pattern: &TriplePattern,
graphs: Option<&[&str]>,
) -> Vec<(Option<String>, Arc<Triple>)> {
match graphs {
None => {
self.find(pattern).into_iter().map(|t| (None, t)).collect()
}
Some([]) => {
let mut results = Vec::new();
for (name, store) in self.named_graphs.read().iter() {
for t in store.find(pattern) {
results.push((Some(name.clone()), t));
}
}
results
}
Some(names) => {
let mut results = Vec::new();
let graphs = self.named_graphs.read();
for name in names {
if let Some(store) = graphs.get(*name) {
for t in store.find(pattern) {
results.push((Some((*name).to_string()), t));
}
}
}
results
}
}
}
pub fn insert_in_transaction(&self, transaction_id: TransactionId, triple: Triple) {
let mut buffer = self.tx_buffer.write();
buffer
.buffers
.entry(transaction_id)
.or_default()
.push(PendingOp::Insert(triple));
}
pub fn remove_in_transaction(&self, transaction_id: TransactionId, triple: Triple) {
let mut buffer = self.tx_buffer.write();
buffer
.buffers
.entry(transaction_id)
.or_default()
.push(PendingOp::Delete(triple));
}
pub fn commit_transaction(&self, transaction_id: TransactionId) -> usize {
let ops = {
let mut buffer = self.tx_buffer.write();
buffer.buffers.remove(&transaction_id).unwrap_or_default()
};
let count = ops.len();
for op in ops {
match op {
PendingOp::Insert(triple) => {
self.insert(triple);
}
PendingOp::Delete(triple) => {
self.remove(&triple);
}
}
}
count
}
pub fn rollback_transaction(&self, transaction_id: TransactionId) -> usize {
let mut buffer = self.tx_buffer.write();
buffer
.buffers
.remove(&transaction_id)
.map_or(0, |ops| ops.len())
}
#[must_use]
pub fn has_pending_ops(&self, transaction_id: TransactionId) -> bool {
let buffer = self.tx_buffer.read();
buffer
.buffers
.get(&transaction_id)
.is_some_and(|ops| !ops.is_empty())
}
pub fn find_with_pending(
&self,
pattern: &TriplePattern,
transaction_id: Option<TransactionId>,
) -> Vec<Arc<Triple>> {
let mut results = self.find(pattern);
if let Some(tx) = transaction_id {
let buffer = self.tx_buffer.read();
if let Some(ops) = buffer.buffers.get(&tx) {
let pending_deletes: FxHashSet<&Triple> = ops
.iter()
.filter_map(|op| match op {
PendingOp::Delete(t) => Some(t),
_ => None,
})
.collect();
if !pending_deletes.is_empty() {
results.retain(|t| !pending_deletes.contains(t.as_ref()));
}
for op in ops {
if let PendingOp::Insert(triple) = op
&& pattern.matches(triple)
{
results.push(Arc::new(triple.clone()));
}
}
}
}
results
}
}
impl Default for RdfStore {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone, Copy)]
pub struct RdfStoreStats {
pub triple_count: usize,
pub subject_count: usize,
pub predicate_count: usize,
pub object_count: usize,
pub graph_count: usize,
}
#[derive(Debug, Clone)]
pub struct BulkLoadResult {
pub triple_count: usize,
pub statistics: crate::statistics::RdfStatistics,
}
#[derive(Debug)]
#[non_exhaustive]
pub enum NTriplesError {
Io(std::io::Error),
Parse {
line: usize,
content: String,
},
}
impl std::fmt::Display for NTriplesError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Io(err) => write!(f, "I/O error: {err}"),
Self::Parse { line, content } => {
write!(f, "parse error at line {line}: {content}")
}
}
}
}
impl std::error::Error for NTriplesError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
Self::Io(err) => Some(err),
Self::Parse { .. } => None,
}
}
}
fn next_ntriples_term(s: &str) -> Option<(&str, &str)> {
let s = s.trim_start();
if let Some(rest) = s.strip_prefix('<') {
let end = rest.find('>')?;
Some((&s[..end + 2], &rest[end + 1..]))
} else if s.starts_with("_:") {
let end = s.find(|c: char| c.is_whitespace()).unwrap_or(s.len());
Some((&s[..end], &s[end..]))
} else if s.starts_with('"') {
let bytes = s.as_bytes();
let mut pos = 1;
while pos < bytes.len() {
if bytes[pos] == b'\\' {
pos += 2; } else if bytes[pos] == b'"' {
pos += 1;
if s[pos..].starts_with("^^<") {
if let Some(end) = s[pos..].find('>') {
pos += end + 1;
}
} else if s[pos..].starts_with('@') {
let lang_end = s[pos..]
.find(|c: char| c.is_whitespace())
.unwrap_or(s.len() - pos);
pos += lang_end;
}
break;
} else {
pos += 1;
}
}
Some((&s[..pos], &s[pos..]))
} else {
None
}
}
fn parse_ntriples_line(line: &str) -> Option<Triple> {
let (subj_str, rest) = next_ntriples_term(line)?;
let (pred_str, rest) = next_ntriples_term(rest)?;
let (obj_str, rest) = next_ntriples_term(rest)?;
let rest = rest.trim();
if !rest.starts_with('.') {
return None;
}
let subject = Term::from_ntriples(subj_str)?;
let predicate = Term::from_ntriples(pred_str)?;
let object = Term::from_ntriples(obj_str)?;
Some(Triple::new(subject, predicate, object))
}
#[cfg(test)]
mod tests {
use super::*;
fn sample_triples() -> Vec<Triple> {
vec![
Triple::new(
Term::iri("http://example.org/alix"),
Term::iri("http://xmlns.com/foaf/0.1/name"),
Term::literal("Alix"),
),
Triple::new(
Term::iri("http://example.org/alix"),
Term::iri("http://xmlns.com/foaf/0.1/age"),
Term::typed_literal("30", "http://www.w3.org/2001/XMLSchema#integer"),
),
Triple::new(
Term::iri("http://example.org/alix"),
Term::iri("http://xmlns.com/foaf/0.1/knows"),
Term::iri("http://example.org/gus"),
),
Triple::new(
Term::iri("http://example.org/gus"),
Term::iri("http://xmlns.com/foaf/0.1/name"),
Term::literal("Gus"),
),
]
}
#[test]
fn test_insert_and_contains() {
let store = RdfStore::new();
let triples = sample_triples();
for triple in &triples {
assert!(store.insert(triple.clone()));
}
assert_eq!(store.len(), 4);
for triple in &triples {
assert!(store.contains(triple));
}
assert!(!store.insert(triples[0].clone()));
assert_eq!(store.len(), 4);
}
#[test]
fn test_remove() {
let store = RdfStore::new();
let triples = sample_triples();
for triple in &triples {
store.insert(triple.clone());
}
assert!(store.remove(&triples[0]));
assert_eq!(store.len(), 3);
assert!(!store.contains(&triples[0]));
assert!(!store.remove(&triples[0]));
}
#[test]
fn test_query_by_subject() {
let store = RdfStore::new();
for triple in sample_triples() {
store.insert(triple);
}
let alix = Term::iri("http://example.org/alix");
let alice_triples = store.triples_with_subject(&alix);
assert_eq!(alice_triples.len(), 3);
for triple in &alice_triples {
assert_eq!(triple.subject(), &alix);
}
}
#[test]
fn test_query_by_predicate() {
let store = RdfStore::new();
for triple in sample_triples() {
store.insert(triple);
}
let name_pred = Term::iri("http://xmlns.com/foaf/0.1/name");
let name_triples = store.triples_with_predicate(&name_pred);
assert_eq!(name_triples.len(), 2);
for triple in &name_triples {
assert_eq!(triple.predicate(), &name_pred);
}
}
#[test]
fn test_query_by_object() {
let store = RdfStore::new();
for triple in sample_triples() {
store.insert(triple);
}
let gus = Term::iri("http://example.org/gus");
let bob_triples = store.triples_with_object(&gus);
assert_eq!(bob_triples.len(), 1);
assert_eq!(bob_triples[0].object(), &gus);
}
#[test]
fn test_pattern_matching() {
let store = RdfStore::new();
for triple in sample_triples() {
store.insert(triple);
}
let pattern = TriplePattern {
subject: Some(Term::iri("http://example.org/alix")),
predicate: Some(Term::iri("http://xmlns.com/foaf/0.1/knows")),
object: None,
};
let results = store.find(&pattern);
assert_eq!(results.len(), 1);
assert_eq!(results[0].object(), &Term::iri("http://example.org/gus"));
}
#[test]
fn test_stats() {
let store = RdfStore::new();
for triple in sample_triples() {
store.insert(triple);
}
let stats = store.stats();
assert_eq!(stats.triple_count, 4);
assert_eq!(stats.subject_count, 2); assert_eq!(stats.predicate_count, 3); }
#[test]
fn test_clear() {
let store = RdfStore::new();
for triple in sample_triples() {
store.insert(triple);
}
assert!(!store.is_empty());
store.clear();
assert!(store.is_empty());
assert_eq!(store.len(), 0);
}
#[test]
fn test_find_with_pending_filters_deletes() {
let store = RdfStore::new();
let triples = sample_triples();
for triple in &triples {
store.insert(triple.clone());
}
let transaction_id = TransactionId::new(1);
store.remove_in_transaction(transaction_id, triples[0].clone());
let pattern = TriplePattern {
subject: Some(Term::iri("http://example.org/alix")),
predicate: None,
object: None,
};
let results = store.find_with_pending(&pattern, Some(transaction_id));
assert_eq!(results.len(), 2);
let deleted = &triples[0];
for result in &results {
assert_ne!(result.as_ref(), deleted);
}
let results_no_tx = store.find_with_pending(&pattern, None);
assert_eq!(results_no_tx.len(), 3);
let new_triple = Triple::new(
Term::iri("http://example.org/alix"),
Term::iri("http://xmlns.com/foaf/0.1/email"),
Term::literal("alix@example.org"),
);
store.insert_in_transaction(transaction_id, new_triple.clone());
let results_with_insert = store.find_with_pending(&pattern, Some(transaction_id));
assert_eq!(results_with_insert.len(), 3);
let found_new = results_with_insert
.iter()
.any(|t| t.as_ref() == &new_triple);
assert!(found_new, "Pending insert should be visible");
}
#[test]
fn test_named_graph_crud() {
let store = RdfStore::new();
assert!(store.create_graph("http://example.org/g1"));
assert!(!store.create_graph("http://example.org/g1")); assert_eq!(store.graph_count(), 1);
let g1 = store.graph("http://example.org/g1").unwrap();
g1.insert(Triple::new(
Term::iri("http://example.org/s1"),
Term::iri("http://example.org/p1"),
Term::literal("o1"),
));
assert_eq!(g1.len(), 1);
assert_eq!(store.len(), 0);
let results = g1.find(&TriplePattern {
subject: None,
predicate: None,
object: None,
});
assert_eq!(results.len(), 1);
assert!(store.drop_graph("http://example.org/g1"));
assert!(!store.drop_graph("http://example.org/g1"));
assert_eq!(store.graph_count(), 0);
}
#[test]
fn test_named_graph_isolation() {
let store = RdfStore::new();
store.insert(Triple::new(
Term::iri("http://example.org/a"),
Term::iri("http://example.org/p"),
Term::literal("default"),
));
let g1 = store.graph_or_create("http://example.org/g1");
g1.insert(Triple::new(
Term::iri("http://example.org/a"),
Term::iri("http://example.org/p"),
Term::literal("named"),
));
assert_eq!(store.len(), 1);
assert_eq!(g1.len(), 1);
assert_eq!(store.triples()[0].object(), &Term::literal("default"));
assert_eq!(g1.triples()[0].object(), &Term::literal("named"));
}
#[test]
fn test_find_in_graphs() {
let store = RdfStore::new();
let pattern = TriplePattern {
subject: None,
predicate: None,
object: None,
};
store.insert(Triple::new(
Term::iri("http://example.org/s"),
Term::iri("http://example.org/p"),
Term::literal("default"),
));
let g1 = store.graph_or_create("http://example.org/g1");
g1.insert(Triple::new(
Term::iri("http://example.org/s"),
Term::iri("http://example.org/p"),
Term::literal("g1"),
));
let results = store.find_in_graphs(&pattern, None);
assert_eq!(results.len(), 1);
assert!(results[0].0.is_none());
let results = store.find_in_graphs(&pattern, Some(&[]));
assert_eq!(results.len(), 1);
assert_eq!(results[0].0.as_deref(), Some("http://example.org/g1"));
let results = store.find_in_graphs(&pattern, Some(&["http://example.org/g1"]));
assert_eq!(results.len(), 1);
assert_eq!(results[0].0.as_deref(), Some("http://example.org/g1"));
}
#[test]
fn test_copy_move_add_graph() {
let store = RdfStore::new();
let triple = Triple::new(
Term::iri("http://example.org/s"),
Term::iri("http://example.org/p"),
Term::literal("value"),
);
store.insert(triple.clone());
store.copy_graph(None, Some("http://example.org/copy"));
assert_eq!(store.len(), 1); let copy = store.graph("http://example.org/copy").unwrap();
assert_eq!(copy.len(), 1);
let g2 = store.graph_or_create("http://example.org/g2");
g2.insert(Triple::new(
Term::iri("http://example.org/s2"),
Term::iri("http://example.org/p2"),
Term::literal("extra"),
));
store.add_graph(
Some("http://example.org/copy"),
Some("http://example.org/g2"),
);
assert_eq!(g2.len(), 2);
store.move_graph(Some("http://example.org/g2"), Some("http://example.org/g3"));
assert!(store.graph("http://example.org/g2").is_none());
let g3 = store.graph("http://example.org/g3").unwrap();
assert_eq!(g3.len(), 2);
}
#[test]
fn test_transaction_commit_and_rollback() {
let store = RdfStore::new();
let triples = sample_triples();
for triple in &triples {
store.insert(triple.clone());
}
assert_eq!(store.len(), 4);
let tx1 = TransactionId::new(1);
store.remove_in_transaction(tx1, triples[0].clone());
assert!(store.has_pending_ops(tx1));
let discarded = store.rollback_transaction(tx1);
assert_eq!(discarded, 1);
assert!(!store.has_pending_ops(tx1));
assert_eq!(store.len(), 4);
let tx2 = TransactionId::new(2);
store.remove_in_transaction(tx2, triples[0].clone());
let applied = store.commit_transaction(tx2);
assert_eq!(applied, 1);
assert_eq!(store.len(), 3); assert!(!store.contains(&triples[0]));
}
#[test]
fn test_batch_insert() {
let store = RdfStore::new();
let triples = sample_triples();
let inserted = store.batch_insert(triples.clone());
assert_eq!(inserted, 4);
assert_eq!(store.len(), 4);
for triple in &triples {
assert!(store.contains(triple));
}
let alix = Term::iri("http://example.org/alix");
assert_eq!(store.triples_with_subject(&alix).len(), 3);
}
#[test]
fn test_batch_insert_with_duplicates() {
let store = RdfStore::new();
let triples = sample_triples();
store.insert(triples[0].clone());
assert_eq!(store.len(), 1);
let inserted = store.batch_insert(triples.clone());
assert_eq!(inserted, 3);
assert_eq!(store.len(), 4);
}
#[test]
fn test_batch_insert_empty() {
let store = RdfStore::new();
let inserted = store.batch_insert(Vec::<Triple>::new());
assert_eq!(inserted, 0);
assert_eq!(store.len(), 0);
}
#[test]
fn test_composite_index_sp_lookup() {
let store = RdfStore::new();
for triple in sample_triples() {
store.insert(triple);
}
let pattern = TriplePattern {
subject: Some(Term::iri("http://example.org/alix")),
predicate: Some(Term::iri("http://xmlns.com/foaf/0.1/name")),
object: None,
};
let results = store.find(&pattern);
assert_eq!(results.len(), 1);
assert_eq!(results[0].object(), &Term::literal("Alix"));
}
#[test]
fn test_composite_index_po_lookup() {
let store = RdfStore::new();
for triple in sample_triples() {
store.insert(triple);
}
let pattern = TriplePattern {
subject: None,
predicate: Some(Term::iri("http://xmlns.com/foaf/0.1/name")),
object: Some(Term::literal("Alix")),
};
let results = store.find(&pattern);
assert_eq!(results.len(), 1);
assert_eq!(results[0].subject(), &Term::iri("http://example.org/alix"));
}
#[test]
fn test_composite_index_os_lookup() {
let store = RdfStore::new();
for triple in sample_triples() {
store.insert(triple);
}
let pattern = TriplePattern {
subject: Some(Term::iri("http://example.org/alix")),
predicate: None,
object: Some(Term::iri("http://example.org/gus")),
};
let results = store.find(&pattern);
assert_eq!(results.len(), 1);
assert_eq!(
results[0].predicate(),
&Term::iri("http://xmlns.com/foaf/0.1/knows")
);
}
#[test]
fn test_composite_index_spo_lookup() {
let store = RdfStore::new();
for triple in sample_triples() {
store.insert(triple);
}
let pattern = TriplePattern {
subject: Some(Term::iri("http://example.org/alix")),
predicate: Some(Term::iri("http://xmlns.com/foaf/0.1/name")),
object: Some(Term::literal("Alix")),
};
let results = store.find(&pattern);
assert_eq!(results.len(), 1);
let pattern = TriplePattern {
subject: Some(Term::iri("http://example.org/alix")),
predicate: Some(Term::iri("http://xmlns.com/foaf/0.1/name")),
object: Some(Term::literal("NotAlix")),
};
let results = store.find(&pattern);
assert_eq!(results.len(), 0);
}
#[test]
fn test_composite_index_removal() {
let store = RdfStore::new();
let triples = sample_triples();
for triple in &triples {
store.insert(triple.clone());
}
store.remove(&triples[0]);
let pattern = TriplePattern {
subject: Some(Term::iri("http://example.org/alix")),
predicate: Some(Term::iri("http://xmlns.com/foaf/0.1/name")),
object: None,
};
let results = store.find(&pattern);
assert_eq!(results.len(), 0);
let pattern = TriplePattern {
subject: None,
predicate: Some(Term::iri("http://xmlns.com/foaf/0.1/name")),
object: Some(Term::literal("Alix")),
};
let results = store.find(&pattern);
assert_eq!(results.len(), 0);
}
#[test]
fn test_composite_index_batch_insert() {
let store = RdfStore::new();
let triples = sample_triples();
store.batch_insert(triples);
let pattern = TriplePattern {
subject: Some(Term::iri("http://example.org/alix")),
predicate: Some(Term::iri("http://xmlns.com/foaf/0.1/knows")),
object: None,
};
let results = store.find(&pattern);
assert_eq!(results.len(), 1);
assert_eq!(results[0].object(), &Term::iri("http://example.org/gus"));
let pattern = TriplePattern {
subject: None,
predicate: Some(Term::iri("http://xmlns.com/foaf/0.1/name")),
object: Some(Term::literal("Gus")),
};
let results = store.find(&pattern);
assert_eq!(results.len(), 1);
}
#[test]
fn test_bulk_load() {
let store = RdfStore::new();
store.insert(Triple::new(
Term::iri("http://example.org/old"),
Term::iri("http://example.org/p"),
Term::literal("old"),
));
let result = store.bulk_load(sample_triples());
assert_eq!(result.triple_count, 4);
assert_eq!(store.len(), 4);
assert!(!store.contains(&Triple::new(
Term::iri("http://example.org/old"),
Term::iri("http://example.org/p"),
Term::literal("old"),
)));
let alix = Term::iri("http://example.org/alix");
assert_eq!(store.triples_with_subject(&alix).len(), 3);
let pattern = TriplePattern {
subject: Some(Term::iri("http://example.org/alix")),
predicate: Some(Term::iri("http://xmlns.com/foaf/0.1/name")),
object: None,
};
assert_eq!(store.find(&pattern).len(), 1);
assert_eq!(result.statistics.total_triples, 4);
assert_eq!(result.statistics.subject_count, 2);
assert_eq!(result.statistics.predicate_count, 3);
}
#[test]
fn test_bulk_load_empty() {
let store = RdfStore::new();
store.insert(Triple::new(
Term::iri("http://example.org/s"),
Term::iri("http://example.org/p"),
Term::literal("v"),
));
let result = store.bulk_load(Vec::<Triple>::new());
assert_eq!(result.triple_count, 0);
assert_eq!(store.len(), 0);
assert!(store.is_empty());
}
#[test]
fn test_parse_ntriples_line() {
let triple = parse_ntriples_line(
r#"<http://example.org/alix> <http://xmlns.com/foaf/0.1/name> "Alix" ."#,
);
assert!(triple.is_some());
let triple = triple.unwrap();
assert_eq!(triple.subject(), &Term::iri("http://example.org/alix"));
assert_eq!(
triple.predicate(),
&Term::iri("http://xmlns.com/foaf/0.1/name")
);
assert_eq!(triple.object(), &Term::literal("Alix"));
let triple = parse_ntriples_line(
r#"<http://example.org/alix> <http://xmlns.com/foaf/0.1/age> "30"^^<http://www.w3.org/2001/XMLSchema#integer> ."#,
);
assert!(triple.is_some());
let triple = triple.unwrap();
assert_eq!(
triple.object(),
&Term::typed_literal("30", "http://www.w3.org/2001/XMLSchema#integer")
);
let triple = parse_ntriples_line(
r#"<http://example.org/alix> <http://xmlns.com/foaf/0.1/name> "Alix"@en ."#,
);
assert!(triple.is_some());
assert_eq!(triple.unwrap().object(), &Term::lang_literal("Alix", "en"));
let triple = parse_ntriples_line(r#"_:b0 <http://xmlns.com/foaf/0.1/name> "Gus" ."#);
assert!(triple.is_some());
assert_eq!(triple.unwrap().subject(), &Term::blank("b0"));
let triple = parse_ntriples_line(
r#"<http://example.org/alix> <http://xmlns.com/foaf/0.1/knows> <http://example.org/gus> ."#,
);
assert!(triple.is_some());
assert_eq!(
triple.unwrap().object(),
&Term::iri("http://example.org/gus")
);
assert!(
parse_ntriples_line(r#"<http://example.org/s> <http://example.org/p> "v""#,).is_none()
);
}
#[test]
fn test_load_ntriples() {
let ntriples = "\
<http://example.org/alix> <http://xmlns.com/foaf/0.1/name> \"Alix\" .
# This is a comment
<http://example.org/alix> <http://xmlns.com/foaf/0.1/knows> <http://example.org/gus> .
<http://example.org/gus> <http://xmlns.com/foaf/0.1/name> \"Gus\" .
";
let store = RdfStore::new();
let result = store.load_ntriples(ntriples.as_bytes()).unwrap();
assert_eq!(result.triple_count, 3);
assert_eq!(store.len(), 3);
assert_eq!(result.statistics.total_triples, 3);
let pattern = TriplePattern {
subject: None,
predicate: Some(Term::iri("http://xmlns.com/foaf/0.1/name")),
object: Some(Term::literal("Gus")),
};
assert_eq!(store.find(&pattern).len(), 1);
}
#[test]
fn test_load_turtle_roundtrip() {
let turtle = r#"
@prefix ex: <http://example.org/> .
@prefix foaf: <http://xmlns.com/foaf/0.1/> .
ex:alix a foaf:Person ;
foaf:name "Alix" ;
foaf:knows ex:gus .
ex:gus foaf:name "Gus" .
"#;
let store = RdfStore::new();
let result = store.load_turtle(turtle).unwrap();
assert_eq!(result.triple_count, 4);
assert_eq!(store.len(), 4);
let alix = Term::iri("http://example.org/alix");
assert_eq!(store.triples_with_subject(&alix).len(), 3);
}
#[test]
fn test_to_turtle_roundtrip() {
let turtle = r#"
@prefix ex: <http://example.org/> .
@prefix foaf: <http://xmlns.com/foaf/0.1/> .
ex:alix foaf:name "Alix" ;
foaf:knows ex:gus .
ex:gus foaf:name "Gus" .
"#;
let store = RdfStore::new();
store.load_turtle(turtle).unwrap();
assert_eq!(store.len(), 3);
let output = store.to_turtle().unwrap();
assert!(!output.is_empty());
let store2 = RdfStore::new();
let result2 = store2.load_turtle(&output).unwrap();
assert_eq!(result2.triple_count, 3);
let pattern = TriplePattern {
subject: Some(Term::iri("http://example.org/alix")),
predicate: Some(Term::iri("http://xmlns.com/foaf/0.1/name")),
object: None,
};
let results = store2.find(&pattern);
assert_eq!(results.len(), 1);
assert_eq!(results[0].object(), &Term::literal("Alix"));
let pattern = TriplePattern {
subject: Some(Term::iri("http://example.org/gus")),
predicate: Some(Term::iri("http://xmlns.com/foaf/0.1/name")),
object: None,
};
let results = store2.find(&pattern);
assert_eq!(results.len(), 1);
assert_eq!(results[0].object(), &Term::literal("Gus"));
}
#[test]
fn test_load_ntriples_parse_error() {
let bad_ntriples = "\
<http://example.org/s> <http://example.org/p> \"ok\" .
this is not valid ntriples
<http://example.org/s2> <http://example.org/p2> \"ok2\" .
";
let store = RdfStore::new();
let result = store.load_ntriples(bad_ntriples.as_bytes());
assert!(result.is_err());
let err = result.unwrap_err();
match err {
NTriplesError::Parse { line, .. } => assert_eq!(line, 2),
_ => panic!("expected Parse error"),
}
}
#[test]
fn test_load_turtle_streaming_inserts_incrementally() {
let turtle = r#"
@prefix ex: <http://example.org/> .
@prefix foaf: <http://xmlns.com/foaf/0.1/> .
ex:alix a foaf:Person ;
foaf:name "Alix" ;
foaf:knows ex:gus .
ex:gus foaf:name "Gus" .
"#;
let store = RdfStore::new();
let count = store.load_turtle_streaming(turtle, 2).unwrap();
assert_eq!(count, 4);
assert_eq!(store.len(), 4);
let alix = Term::iri("http://example.org/alix");
assert_eq!(store.triples_with_subject(&alix).len(), 3);
}
#[test]
fn test_load_turtle_streaming_does_not_replace_existing() {
let store = RdfStore::new();
store.insert(Triple::new(
Term::iri("http://example.org/existing"),
Term::iri("http://example.org/p"),
Term::literal("value"),
));
assert_eq!(store.len(), 1);
let turtle = r#"
<http://example.org/new> <http://example.org/p> "added" .
"#;
let count = store.load_turtle_streaming(turtle, 100).unwrap();
assert_eq!(count, 1);
assert_eq!(store.len(), 2);
}
#[test]
fn test_load_turtle_streaming_deduplicates() {
let turtle = r#"
<http://example.org/s> <http://example.org/p> "o" .
<http://example.org/s> <http://example.org/p> "o" .
<http://example.org/s> <http://example.org/p> "o" .
"#;
let store = RdfStore::new();
let count = store.load_turtle_streaming(turtle, 100).unwrap();
assert_eq!(count, 1, "duplicates should be filtered by batch_insert");
assert_eq!(store.len(), 1);
}
#[test]
fn test_load_ntriples_streaming() {
let ntriples = "\
<http://example.org/alix> <http://xmlns.com/foaf/0.1/name> \"Alix\" .
<http://example.org/alix> <http://xmlns.com/foaf/0.1/knows> <http://example.org/gus> .
<http://example.org/gus> <http://xmlns.com/foaf/0.1/name> \"Gus\" .
";
let store = RdfStore::new();
let count = store
.load_ntriples_streaming(ntriples.as_bytes(), 2)
.unwrap();
assert_eq!(count, 3);
assert_eq!(store.len(), 3);
}
#[test]
fn test_load_ntriples_streaming_does_not_replace_existing() {
let store = RdfStore::new();
store.insert(Triple::new(
Term::iri("http://example.org/existing"),
Term::iri("http://example.org/p"),
Term::literal("value"),
));
let ntriples = "<http://example.org/new> <http://example.org/p> \"added\" .\n";
let count = store
.load_ntriples_streaming(ntriples.as_bytes(), 100)
.unwrap();
assert_eq!(count, 1);
assert_eq!(store.len(), 2);
}
#[test]
fn test_load_turtle_reader() {
let turtle = r#"
@prefix ex: <http://example.org/> .
ex:alix ex:name "Alix" .
ex:gus ex:name "Gus" .
"#;
let store = RdfStore::new();
let count = store.load_turtle_reader(turtle.as_bytes(), 100).unwrap();
assert_eq!(count, 2);
assert_eq!(store.len(), 2);
}
#[test]
fn test_parse_into_with_count_sink() {
use crate::graph::rdf::sink::CountSink;
use crate::graph::rdf::turtle::TurtleParser;
let turtle = r#"
@prefix ex: <http://example.org/> .
ex:a ex:p "1" .
ex:b ex:p "2" .
ex:c ex:p "3" .
"#;
let mut sink = CountSink::new();
let mut parser = TurtleParser::new();
parser.parse_into(turtle, &mut sink).unwrap();
assert_eq!(sink.count(), 3);
}
#[test]
fn test_streaming_turtle_with_collections() {
let turtle = r#"
@prefix ex: <http://example.org/> .
ex:list ex:items ( "a" "b" "c" ) .
"#;
let store = RdfStore::new();
let count = store.load_turtle_streaming(turtle, 100).unwrap();
assert_eq!(count, 7);
assert_eq!(store.len(), 7);
}
#[test]
fn test_streaming_turtle_with_blank_node_property_list() {
let turtle = r#"
@prefix ex: <http://example.org/> .
ex:alix ex:address [ ex:city "Amsterdam" ; ex:country "NL" ] .
"#;
let store = RdfStore::new();
let count = store.load_turtle_streaming(turtle, 100).unwrap();
assert_eq!(count, 3);
assert_eq!(store.len(), 3);
}
#[test]
#[cfg(feature = "ring-index")]
fn test_ring_built_during_bulk_load() {
let store = RdfStore::new();
assert!(
store.ring().is_none(),
"ring should not exist before bulk load"
);
store.bulk_load(vec![
Triple::new(
Term::iri("http://ex.org/alix"),
Term::iri("http://ex.org/knows"),
Term::iri("http://ex.org/gus"),
),
Triple::new(
Term::iri("http://ex.org/gus"),
Term::iri("http://ex.org/knows"),
Term::iri("http://ex.org/alix"),
),
]);
let ring = store.ring().expect("ring should exist after bulk load");
assert_eq!(ring.len(), 2);
}
#[test]
#[cfg(feature = "ring-index")]
fn test_ring_stale_after_insert() {
let store = RdfStore::new();
store.bulk_load(vec![Triple::new(
Term::iri("http://ex.org/a"),
Term::iri("http://ex.org/p"),
Term::iri("http://ex.org/b"),
)]);
assert!(store.ring().is_some());
store.insert(Triple::new(
Term::iri("http://ex.org/c"),
Term::iri("http://ex.org/p"),
Term::iri("http://ex.org/d"),
));
assert!(
store.ring().is_none(),
"ring should be stale after incremental insert"
);
store.rebuild_ring();
let ring = store.ring().expect("ring should exist after rebuild");
assert_eq!(ring.len(), 2);
}
#[test]
#[cfg(feature = "ring-index")]
fn test_ring_find_matches_store_find() {
let triples = vec![
Triple::new(
Term::iri("http://ex.org/alix"),
Term::iri("http://xmlns.com/foaf/0.1/name"),
Term::literal("Alix"),
),
Triple::new(
Term::iri("http://ex.org/gus"),
Term::iri("http://xmlns.com/foaf/0.1/name"),
Term::literal("Gus"),
),
Triple::new(
Term::iri("http://ex.org/alix"),
Term::iri("http://xmlns.com/foaf/0.1/knows"),
Term::iri("http://ex.org/gus"),
),
];
let store = RdfStore::new();
store.bulk_load(triples);
let ring = store.ring().expect("ring should exist");
let all_pattern = TriplePattern {
subject: None,
predicate: None,
object: None,
};
assert_eq!(ring.count(&all_pattern), store.len());
let name_pattern = TriplePattern {
subject: None,
predicate: Some(Term::iri("http://xmlns.com/foaf/0.1/name")),
object: None,
};
assert_eq!(ring.count(&name_pattern), 2);
let alix_pattern = TriplePattern {
subject: Some(Term::iri("http://ex.org/alix")),
predicate: None,
object: None,
};
assert_eq!(ring.count(&alix_pattern), 2);
}
#[test]
fn test_batch_insert_composite_indexes() {
let store = RdfStore::new();
let triples = vec![
Triple::new(
Term::iri("http://ex.org/alix"),
Term::iri("http://ex.org/name"),
Term::literal("Alix"),
),
Triple::new(
Term::iri("http://ex.org/gus"),
Term::iri("http://ex.org/name"),
Term::literal("Gus"),
),
Triple::new(
Term::iri("http://ex.org/alix"),
Term::iri("http://ex.org/age"),
Term::typed_literal("30", "http://www.w3.org/2001/XMLSchema#integer"),
),
];
let inserted = store.batch_insert(triples);
assert_eq!(inserted, 3);
assert_eq!(store.len(), 3);
let sp_result = store.find(&TriplePattern {
subject: Some(Term::iri("http://ex.org/alix")),
predicate: Some(Term::iri("http://ex.org/name")),
object: None,
});
assert_eq!(sp_result.len(), 1);
let po_result = store.find(&TriplePattern {
subject: None,
predicate: Some(Term::iri("http://ex.org/name")),
object: Some(Term::literal("Gus")),
});
assert_eq!(po_result.len(), 1);
let os_result = store.find(&TriplePattern {
subject: Some(Term::iri("http://ex.org/alix")),
predicate: None,
object: Some(Term::literal("Alix")),
});
assert_eq!(os_result.len(), 1);
}
#[test]
fn test_batch_insert_deduplication() {
let store = RdfStore::new();
let triple = Triple::new(
Term::iri("http://ex.org/a"),
Term::iri("http://ex.org/b"),
Term::literal("c"),
);
let inserted = store.batch_insert(vec![triple.clone(), triple.clone(), triple]);
assert_eq!(inserted, 1);
assert_eq!(store.len(), 1);
}
#[test]
fn test_composite_index_after_remove() {
let store = RdfStore::new();
let t1 = Triple::new(
Term::iri("http://ex.org/a"),
Term::iri("http://ex.org/p"),
Term::literal("v1"),
);
let t2 = Triple::new(
Term::iri("http://ex.org/a"),
Term::iri("http://ex.org/p"),
Term::literal("v2"),
);
store.insert(t1.clone());
store.insert(t2);
assert_eq!(
store
.find(&TriplePattern {
subject: Some(Term::iri("http://ex.org/a")),
predicate: Some(Term::iri("http://ex.org/p")),
object: None
})
.len(),
2
);
store.remove(&t1);
assert_eq!(
store
.find(&TriplePattern {
subject: Some(Term::iri("http://ex.org/a")),
predicate: Some(Term::iri("http://ex.org/p")),
object: None
})
.len(),
1
);
}
#[test]
fn test_named_graph_operations() {
let store = RdfStore::new();
assert!(store.create_graph("http://ex.org/g1"));
assert!(!store.create_graph("http://ex.org/g1")); let g = store.graph("http://ex.org/g1").unwrap();
g.insert(Triple::new(
Term::iri("http://ex.org/a"),
Term::iri("http://ex.org/b"),
Term::literal("c"),
));
assert_eq!(g.len(), 1);
assert!(store.drop_graph("http://ex.org/g1"));
assert!(store.graph("http://ex.org/g1").is_none());
}
#[test]
fn test_statistics_cache_invalidation() {
let store = RdfStore::new();
store.insert(Triple::new(
Term::iri("http://ex.org/a"),
Term::iri("http://ex.org/p"),
Term::literal("v"),
));
let stats1 = store.get_or_collect_statistics();
assert!(stats1.get_predicate("<http://ex.org/p>").is_some());
store.insert(Triple::new(
Term::iri("http://ex.org/b"),
Term::iri("http://ex.org/q"),
Term::literal("w"),
));
let stats2 = store.get_or_collect_statistics();
assert!(stats2.get_predicate("<http://ex.org/q>").is_some());
}
#[test]
fn test_find_three_bound() {
let store = RdfStore::new();
let t = Triple::new(
Term::iri("http://ex.org/a"),
Term::iri("http://ex.org/p"),
Term::literal("v"),
);
store.insert(t.clone());
store.insert(Triple::new(
Term::iri("http://ex.org/a"),
Term::iri("http://ex.org/p"),
Term::literal("other"),
));
let result = store.find(&TriplePattern {
subject: Some(Term::iri("http://ex.org/a")),
predicate: Some(Term::iri("http://ex.org/p")),
object: Some(Term::literal("v")),
});
assert_eq!(result.len(), 1);
assert_eq!(result[0].as_ref(), &t);
}
}