use crate::model::{Object, Predicate, Subject, Triple};
use crate::store::term_interner::TermInterner;
use std::collections::{BTreeMap, BTreeSet, HashMap};
use std::sync::{Arc, RwLock};
type TripleIndex = Arc<RwLock<BTreeMap<u32, BTreeMap<u32, BTreeSet<u32>>>>>;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct InternedTriple {
pub subject_id: u32,
pub predicate_id: u32,
pub object_id: u32,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum IndexType {
SPO, POS, OSP, }
#[derive(Debug)]
pub struct IndexedGraph {
interner: Arc<TermInterner>,
spo_index: TripleIndex,
pos_index: TripleIndex,
osp_index: TripleIndex,
triple_count: Arc<RwLock<usize>>,
stats: Arc<RwLock<IndexStats>>,
}
#[derive(Debug, Clone, Default)]
pub struct IndexStats {
pub spo_lookups: usize,
pub pos_lookups: usize,
pub osp_lookups: usize,
pub total_insertions: usize,
pub total_deletions: usize,
pub batch_insertions: usize,
}
impl IndexStats {
pub fn most_used_index(&self) -> IndexType {
if self.spo_lookups >= self.pos_lookups && self.spo_lookups >= self.osp_lookups {
IndexType::SPO
} else if self.pos_lookups >= self.osp_lookups {
IndexType::POS
} else {
IndexType::OSP
}
}
}
impl IndexedGraph {
pub fn new() -> Self {
IndexedGraph {
interner: Arc::new(TermInterner::new()),
spo_index: Arc::new(RwLock::new(BTreeMap::new())),
pos_index: Arc::new(RwLock::new(BTreeMap::new())),
osp_index: Arc::new(RwLock::new(BTreeMap::new())),
triple_count: Arc::new(RwLock::new(0)),
stats: Arc::new(RwLock::new(IndexStats::default())),
}
}
pub fn with_interner(interner: Arc<TermInterner>) -> Self {
IndexedGraph {
interner,
spo_index: Arc::new(RwLock::new(BTreeMap::new())),
pos_index: Arc::new(RwLock::new(BTreeMap::new())),
osp_index: Arc::new(RwLock::new(BTreeMap::new())),
triple_count: Arc::new(RwLock::new(0)),
stats: Arc::new(RwLock::new(IndexStats::default())),
}
}
pub fn insert(&self, triple: &Triple) -> bool {
let s_id = self.interner.intern_subject(triple.subject());
let p_id = self.interner.intern_predicate(triple.predicate());
let o_id = self.interner.intern_object(triple.object());
let interned = InternedTriple {
subject_id: s_id,
predicate_id: p_id,
object_id: o_id,
};
self.insert_interned(interned)
}
fn insert_interned(&self, triple: InternedTriple) -> bool {
let mut spo = self.spo_index.write().expect("spo_index lock poisoned");
let mut pos = self.pos_index.write().expect("pos_index lock poisoned");
let mut osp = self.osp_index.write().expect("osp_index lock poisoned");
if let Some(po_map) = spo.get(&triple.subject_id) {
if let Some(o_set) = po_map.get(&triple.predicate_id) {
if o_set.contains(&triple.object_id) {
return false; }
}
}
spo.entry(triple.subject_id)
.or_default()
.entry(triple.predicate_id)
.or_default()
.insert(triple.object_id);
pos.entry(triple.predicate_id)
.or_default()
.entry(triple.object_id)
.or_default()
.insert(triple.subject_id);
osp.entry(triple.object_id)
.or_default()
.entry(triple.subject_id)
.or_default()
.insert(triple.predicate_id);
*self
.triple_count
.write()
.expect("triple_count lock poisoned") += 1;
self.stats
.write()
.expect("stats lock poisoned")
.total_insertions += 1;
true
}
pub fn batch_insert(&self, triples: &[Triple]) -> Vec<bool> {
let interned_triples: Vec<InternedTriple> = triples
.iter()
.map(|t| InternedTriple {
subject_id: self.interner.intern_subject(t.subject()),
predicate_id: self.interner.intern_predicate(t.predicate()),
object_id: self.interner.intern_object(t.object()),
})
.collect();
let mut spo = self.spo_index.write().expect("spo_index lock poisoned");
let mut pos = self.pos_index.write().expect("pos_index lock poisoned");
let mut osp = self.osp_index.write().expect("osp_index lock poisoned");
let mut count = self
.triple_count
.write()
.expect("triple_count lock poisoned");
let mut stats = self.stats.write().expect("stats lock poisoned");
let mut results = Vec::with_capacity(triples.len());
let mut inserted_count = 0;
for triple in interned_triples {
let exists = spo
.get(&triple.subject_id)
.and_then(|po| po.get(&triple.predicate_id))
.is_some_and(|o_set| o_set.contains(&triple.object_id));
if !exists {
spo.entry(triple.subject_id)
.or_default()
.entry(triple.predicate_id)
.or_default()
.insert(triple.object_id);
pos.entry(triple.predicate_id)
.or_default()
.entry(triple.object_id)
.or_default()
.insert(triple.subject_id);
osp.entry(triple.object_id)
.or_default()
.entry(triple.subject_id)
.or_default()
.insert(triple.predicate_id);
inserted_count += 1;
results.push(true);
} else {
results.push(false);
}
}
*count += inserted_count;
stats.total_insertions += inserted_count;
stats.batch_insertions += 1;
results
}
pub fn remove(&self, triple: &Triple) -> bool {
let s_id = match self.interner.get_subject_id(triple.subject()) {
Some(id) => id,
None => return false, };
let p_id = match self.interner.get_predicate_id(triple.predicate()) {
Some(id) => id,
None => return false, };
let o_id = match self.interner.get_object_id(triple.object()) {
Some(id) => id,
None => return false, };
self.remove_interned(s_id, p_id, o_id)
}
fn remove_interned(&self, s_id: u32, p_id: u32, o_id: u32) -> bool {
let mut spo = self.spo_index.write().expect("spo_index lock poisoned");
let mut pos = self.pos_index.write().expect("pos_index lock poisoned");
let mut osp = self.osp_index.write().expect("osp_index lock poisoned");
let mut removed = false;
if let Some(po_map) = spo.get_mut(&s_id) {
if let Some(o_set) = po_map.get_mut(&p_id) {
removed = o_set.remove(&o_id);
if o_set.is_empty() {
po_map.remove(&p_id);
if po_map.is_empty() {
spo.remove(&s_id);
}
}
}
}
if removed {
if let Some(os_map) = pos.get_mut(&p_id) {
if let Some(s_set) = os_map.get_mut(&o_id) {
s_set.remove(&s_id);
if s_set.is_empty() {
os_map.remove(&o_id);
if os_map.is_empty() {
pos.remove(&p_id);
}
}
}
}
if let Some(sp_map) = osp.get_mut(&o_id) {
if let Some(p_set) = sp_map.get_mut(&s_id) {
p_set.remove(&p_id);
if p_set.is_empty() {
sp_map.remove(&s_id);
if sp_map.is_empty() {
osp.remove(&o_id);
}
}
}
}
*self
.triple_count
.write()
.expect("triple_count lock poisoned") -= 1;
self.stats
.write()
.expect("stats lock poisoned")
.total_deletions += 1;
}
removed
}
pub fn query(
&self,
subject: Option<&Subject>,
predicate: Option<&Predicate>,
object: Option<&Object>,
) -> Vec<Triple> {
let index_type =
self.select_index(subject.is_some(), predicate.is_some(), object.is_some());
match index_type {
IndexType::SPO => self.stats.write().expect("stats lock poisoned").spo_lookups += 1,
IndexType::POS => self.stats.write().expect("stats lock poisoned").pos_lookups += 1,
IndexType::OSP => self.stats.write().expect("stats lock poisoned").osp_lookups += 1,
}
let s_id = subject.and_then(|s| self.interner.get_subject_id(s));
let p_id = predicate.and_then(|p| self.interner.get_predicate_id(p));
let o_id = object.and_then(|o| self.interner.get_object_id(o));
if (subject.is_some() && s_id.is_none())
|| (predicate.is_some() && p_id.is_none())
|| (object.is_some() && o_id.is_none())
{
return Vec::new();
}
let interned_results = match index_type {
IndexType::SPO => self.query_spo(s_id, p_id, o_id),
IndexType::POS => self.query_pos(p_id, o_id, s_id),
IndexType::OSP => self.query_osp(o_id, s_id, p_id),
};
interned_results
.into_iter()
.filter_map(|it| self.interned_to_triple(it))
.collect()
}
fn select_index(&self, has_s: bool, has_p: bool, has_o: bool) -> IndexType {
match (has_s, has_p, has_o) {
(true, true, _) => IndexType::SPO, (true, false, true) => IndexType::OSP, (false, true, true) => IndexType::POS, (true, false, false) => IndexType::SPO, (false, true, false) => IndexType::POS, (false, false, true) => IndexType::OSP, _ => IndexType::SPO, }
}
fn query_spo(
&self,
s_id: Option<u32>,
p_id: Option<u32>,
o_id: Option<u32>,
) -> Vec<InternedTriple> {
let spo = self.spo_index.read().expect("spo_index lock poisoned");
let mut results = Vec::new();
match (s_id, p_id, o_id) {
(Some(s), Some(p), Some(o)) => {
if let Some(po_map) = spo.get(&s) {
if let Some(o_set) = po_map.get(&p) {
if o_set.contains(&o) {
results.push(InternedTriple {
subject_id: s,
predicate_id: p,
object_id: o,
});
}
}
}
}
(Some(s), Some(p), None) => {
if let Some(po_map) = spo.get(&s) {
if let Some(o_set) = po_map.get(&p) {
for &o in o_set {
results.push(InternedTriple {
subject_id: s,
predicate_id: p,
object_id: o,
});
}
}
}
}
(Some(s), None, None) => {
if let Some(po_map) = spo.get(&s) {
for (&p, o_set) in po_map {
for &o in o_set {
results.push(InternedTriple {
subject_id: s,
predicate_id: p,
object_id: o,
});
}
}
}
}
(None, None, None) => {
for (&s, po_map) in spo.iter() {
for (&p, o_set) in po_map {
for &o in o_set {
results.push(InternedTriple {
subject_id: s,
predicate_id: p,
object_id: o,
});
}
}
}
}
_ => {
for (&s, po_map) in spo.iter() {
if s_id.map_or(true, |id| id == s) {
for (&p, o_set) in po_map {
if p_id.map_or(true, |id| id == p) {
for &o in o_set {
if o_id.map_or(true, |id| id == o) {
results.push(InternedTriple {
subject_id: s,
predicate_id: p,
object_id: o,
});
}
}
}
}
}
}
}
}
results
}
fn query_pos(
&self,
p_id: Option<u32>,
o_id: Option<u32>,
s_id: Option<u32>,
) -> Vec<InternedTriple> {
let pos = self.pos_index.read().expect("pos_index lock poisoned");
let mut results = Vec::new();
match (p_id, o_id) {
(Some(p), Some(o)) => {
if let Some(os_map) = pos.get(&p) {
if let Some(s_set) = os_map.get(&o) {
for &s in s_set {
if s_id.map_or(true, |id| id == s) {
results.push(InternedTriple {
subject_id: s,
predicate_id: p,
object_id: o,
});
}
}
}
}
}
(Some(p), None) => {
if let Some(os_map) = pos.get(&p) {
for (&o, s_set) in os_map {
for &s in s_set {
if s_id.map_or(true, |id| id == s) {
results.push(InternedTriple {
subject_id: s,
predicate_id: p,
object_id: o,
});
}
}
}
}
}
_ => {
return self.query_spo(s_id, p_id, o_id);
}
}
results
}
fn query_osp(
&self,
o_id: Option<u32>,
s_id: Option<u32>,
p_id: Option<u32>,
) -> Vec<InternedTriple> {
let osp = self.osp_index.read().expect("osp_index lock poisoned");
let mut results = Vec::new();
match (o_id, s_id) {
(Some(o), Some(s)) => {
if let Some(sp_map) = osp.get(&o) {
if let Some(p_set) = sp_map.get(&s) {
for &p in p_set {
if p_id.map_or(true, |id| id == p) {
results.push(InternedTriple {
subject_id: s,
predicate_id: p,
object_id: o,
});
}
}
}
}
}
(Some(o), None) => {
if let Some(sp_map) = osp.get(&o) {
for (&s, p_set) in sp_map {
for &p in p_set {
if p_id.map_or(true, |id| id == p) {
results.push(InternedTriple {
subject_id: s,
predicate_id: p,
object_id: o,
});
}
}
}
}
}
_ => {
return self.query_spo(s_id, p_id, o_id);
}
}
results
}
fn interned_to_triple(&self, interned: InternedTriple) -> Option<Triple> {
let subject = self.interner.get_subject(interned.subject_id)?;
let predicate = self.interner.get_predicate(interned.predicate_id)?;
let object = self.interner.get_object(interned.object_id)?;
Some(Triple::new(subject, predicate, object))
}
pub fn len(&self) -> usize {
*self
.triple_count
.read()
.expect("triple_count lock poisoned")
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn memory_usage(&self) -> MemoryUsage {
let spo = self.spo_index.read().expect("spo_index lock poisoned");
let pos = self.pos_index.read().expect("pos_index lock poisoned");
let osp = self.osp_index.read().expect("osp_index lock poisoned");
let spo_entries = count_index_entries(&spo);
let pos_entries = count_index_entries(&pos);
let osp_entries = count_index_entries(&osp);
MemoryUsage {
term_interner_bytes: self.interner.memory_usage(),
spo_index_bytes: estimate_index_memory(&spo),
pos_index_bytes: estimate_index_memory(&pos),
osp_index_bytes: estimate_index_memory(&osp),
total_triple_count: self.len(),
index_entry_count: spo_entries + pos_entries + osp_entries,
}
}
pub fn index_stats(&self) -> IndexStats {
self.stats.read().expect("stats lock poisoned").clone()
}
pub fn clear(&self) {
self.spo_index
.write()
.expect("spo_index lock poisoned")
.clear();
self.pos_index
.write()
.expect("pos_index lock poisoned")
.clear();
self.osp_index
.write()
.expect("osp_index lock poisoned")
.clear();
*self
.triple_count
.write()
.expect("triple_count lock poisoned") = 0;
self.interner.clear();
}
pub fn interner(&self) -> &Arc<TermInterner> {
&self.interner
}
#[cfg(feature = "parallel")]
pub fn par_insert_batch(&self, triples: Vec<Triple>) -> Vec<bool> {
use rayon::prelude::*;
let interned_triples: Vec<_> = triples
.par_iter()
.map(|triple| InternedTriple {
subject_id: self.interner.intern_subject(triple.subject()),
predicate_id: self.interner.intern_predicate(triple.predicate()),
object_id: self.interner.intern_object(triple.object()),
})
.collect();
let mut grouped: HashMap<u32, Vec<InternedTriple>> = HashMap::new();
for interned in interned_triples {
grouped
.entry(interned.subject_id)
.or_default()
.push(interned);
}
let results: Vec<_> = grouped
.into_par_iter()
.flat_map(|(_, group)| {
let mut group_results = Vec::new();
for interned in group {
let inserted = self.insert_interned(interned);
group_results.push(inserted);
}
group_results
})
.collect();
results
}
#[cfg(feature = "parallel")]
pub fn par_remove_batch(&self, triples: &[Triple]) -> Vec<bool> {
use rayon::prelude::*;
triples
.par_iter()
.map(|triple| self.remove(triple))
.collect()
}
#[cfg(feature = "parallel")]
pub fn par_query_batch(
&self,
patterns: Vec<(Option<Subject>, Option<Predicate>, Option<Object>)>,
) -> Vec<Vec<Triple>> {
use rayon::prelude::*;
patterns
.into_par_iter()
.map(|(s, p, o)| self.query(s.as_ref(), p.as_ref(), o.as_ref()))
.collect()
}
#[cfg(feature = "parallel")]
pub fn par_transform<F>(&self, transform: F) -> Vec<Triple>
where
F: Fn(&Triple) -> Option<Triple> + Send + Sync,
{
use rayon::prelude::*;
let all_triples = self.query(None, None, None);
all_triples
.into_par_iter()
.filter_map(|triple| transform(&triple))
.collect()
}
#[cfg(feature = "parallel")]
pub fn par_map<F, R>(&self, mapper: F) -> Vec<R>
where
F: Fn(&Triple) -> R + Send + Sync,
R: Send,
{
use rayon::prelude::*;
let all_triples = self.query(None, None, None);
all_triples
.into_par_iter()
.map(|triple| mapper(&triple))
.collect()
}
#[cfg(feature = "parallel")]
pub fn par_filter<F>(&self, predicate: F) -> Vec<Triple>
where
F: Fn(&Triple) -> bool + Send + Sync,
{
use rayon::prelude::*;
let all_triples = self.query(None, None, None);
all_triples
.into_par_iter()
.filter(|triple| predicate(triple))
.collect()
}
#[cfg(feature = "parallel")]
pub fn par_fold<F, R>(&self, init: R, fold_fn: F) -> R
where
F: Fn(R, &Triple) -> R + Send + Sync,
R: Send + Sync + Clone + 'static + std::ops::Add<Output = R>,
{
use rayon::prelude::*;
let all_triples = self.query(None, None, None);
all_triples
.into_par_iter()
.fold(|| init.clone(), |acc, triple| fold_fn(acc, &triple))
.reduce(|| init.clone(), |acc1, acc2| acc1 + acc2)
}
pub fn iter(&self) -> impl Iterator<Item = Triple> + use<> {
self.query(None, None, None).into_iter()
}
pub fn match_pattern(
&self,
subject: Option<&Subject>,
predicate: Option<&Predicate>,
object: Option<&Object>,
) -> Vec<Triple> {
self.query(subject, predicate, object)
}
}
impl Default for IndexedGraph {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone)]
pub struct MemoryUsage {
pub term_interner_bytes: usize,
pub spo_index_bytes: usize,
pub pos_index_bytes: usize,
pub osp_index_bytes: usize,
pub total_triple_count: usize,
pub index_entry_count: usize,
}
impl MemoryUsage {
pub fn total_bytes(&self) -> usize {
self.term_interner_bytes
+ self.spo_index_bytes
+ self.pos_index_bytes
+ self.osp_index_bytes
}
pub fn bytes_per_triple(&self) -> f64 {
if self.total_triple_count == 0 {
0.0
} else {
self.total_bytes() as f64 / self.total_triple_count as f64
}
}
}
fn count_index_entries(index: &BTreeMap<u32, BTreeMap<u32, BTreeSet<u32>>>) -> usize {
index.values().map(|inner| inner.len()).sum()
}
fn estimate_index_memory(index: &BTreeMap<u32, BTreeMap<u32, BTreeSet<u32>>>) -> usize {
let mut total = 0;
for inner in index.values() {
total += 4; total += 24; for set in inner.values() {
total += 4; total += 24; total += set.len() * 4; }
}
total
}
#[cfg(test)]
mod tests {
use super::*;
use crate::model::{Literal, NamedNode};
fn create_test_triple(s: &str, p: &str, o: &str) -> Triple {
Triple::new(
NamedNode::new(s).expect("valid IRI"),
NamedNode::new(p).expect("valid IRI"),
Literal::new(o),
)
}
#[test]
fn test_basic_operations() {
let graph = IndexedGraph::new();
let triple =
create_test_triple("http://example.org/s1", "http://example.org/p1", "object1");
assert!(graph.insert(&triple));
assert!(!graph.insert(&triple)); assert_eq!(graph.len(), 1);
let results = graph.query(Some(triple.subject()), None, None);
assert_eq!(results.len(), 1);
assert_eq!(results[0], triple);
assert!(graph.remove(&triple));
assert!(!graph.remove(&triple)); assert_eq!(graph.len(), 0);
}
#[test]
fn test_batch_insert() {
let graph = IndexedGraph::new();
let triples = vec![
create_test_triple("http://example.org/s1", "http://example.org/p1", "o1"),
create_test_triple("http://example.org/s1", "http://example.org/p2", "o2"),
create_test_triple("http://example.org/s2", "http://example.org/p1", "o3"),
create_test_triple("http://example.org/s1", "http://example.org/p1", "o1"), ];
let results = graph.batch_insert(&triples);
assert_eq!(results, vec![true, true, true, false]);
assert_eq!(graph.len(), 3);
}
#[test]
fn test_query_patterns() {
let graph = IndexedGraph::new();
let triples = vec![
create_test_triple("http://example.org/s1", "http://example.org/p1", "o1"),
create_test_triple("http://example.org/s1", "http://example.org/p2", "o2"),
create_test_triple("http://example.org/s2", "http://example.org/p1", "o3"),
create_test_triple("http://example.org/s2", "http://example.org/p2", "o4"),
];
for triple in &triples {
graph.insert(triple);
}
let s1 = Subject::NamedNode(NamedNode::new("http://example.org/s1").expect("valid IRI"));
let results = graph.query(Some(&s1), None, None);
assert_eq!(results.len(), 2);
let p1 = Predicate::NamedNode(NamedNode::new("http://example.org/p1").expect("valid IRI"));
let results = graph.query(None, Some(&p1), None);
assert_eq!(results.len(), 2);
let o1 = Object::Literal(Literal::new("o1"));
let results = graph.query(None, None, Some(&o1));
assert_eq!(results.len(), 1);
let results = graph.query(Some(&s1), Some(&p1), None);
assert_eq!(results.len(), 1);
let results = graph.query(None, Some(&p1), Some(&o1));
assert_eq!(results.len(), 1);
let o2 = Object::Literal(Literal::new("o2"));
let results = graph.query(Some(&s1), None, Some(&o2));
assert_eq!(results.len(), 1);
let results = graph.query(Some(&s1), Some(&p1), Some(&o1));
assert_eq!(results.len(), 1);
let results = graph.query(None, None, None);
assert_eq!(results.len(), 4);
}
#[test]
fn test_index_selection() {
let graph = IndexedGraph::new();
assert_eq!(graph.select_index(true, true, true), IndexType::SPO);
assert_eq!(graph.select_index(true, true, false), IndexType::SPO);
assert_eq!(graph.select_index(true, false, false), IndexType::SPO);
assert_eq!(graph.select_index(false, true, true), IndexType::POS);
assert_eq!(graph.select_index(false, true, false), IndexType::POS);
assert_eq!(graph.select_index(true, false, true), IndexType::OSP);
assert_eq!(graph.select_index(false, false, true), IndexType::OSP);
}
#[test]
fn test_memory_usage() {
let graph = IndexedGraph::new();
for i in 0..10 {
let triple = create_test_triple(
&format!("http://example.org/s{i}"),
"http://example.org/p1",
&format!("object{i}"),
);
graph.insert(&triple);
}
let usage = graph.memory_usage();
assert_eq!(usage.total_triple_count, 10);
assert!(usage.total_bytes() > 0);
assert!(usage.bytes_per_triple() > 0.0);
}
#[test]
fn test_clear() {
let graph = IndexedGraph::new();
let triple = create_test_triple("http://example.org/s1", "http://example.org/p1", "o1");
graph.insert(&triple);
assert_eq!(graph.len(), 1);
graph.clear();
assert_eq!(graph.len(), 0);
assert!(graph.is_empty());
assert!(graph.insert(&triple));
assert_eq!(graph.len(), 1);
}
#[test]
fn test_concurrent_access() {
use std::sync::Arc;
use std::thread;
let graph = Arc::new(IndexedGraph::new());
let mut handles = vec![];
for i in 0..10 {
let graph_clone = Arc::clone(&graph);
let handle = thread::spawn(move || {
let triple = create_test_triple(
&format!("http://example.org/s{i}"),
"http://example.org/p1",
&format!("o{i}"),
);
graph_clone.insert(&triple)
});
handles.push(handle);
}
for handle in handles {
handle.join().expect("thread should not panic");
}
assert_eq!(graph.len(), 10);
let mut handles = vec![];
for _ in 0..10 {
let graph_clone = Arc::clone(&graph);
let handle = thread::spawn(move || graph_clone.query(None, None, None).len());
handles.push(handle);
}
for handle in handles {
let count = handle.join().expect("thread should not panic");
assert_eq!(count, 10);
}
}
}