use crate::model::{BlankNode, Literal, NamedNode, Object, Predicate, Subject};
use anyhow::{Context, Result};
use bimap::BiMap;
use serde::{Deserialize, Serialize};
use std::fs::{File, OpenOptions};
use std::io::{BufReader, BufWriter, Write};
use std::path::Path;
use std::sync::{Arc, RwLock};
#[derive(Debug)]
pub struct TermInterner {
subjects: Arc<RwLock<BiMap<u32, SubjectTerm>>>,
predicates: Arc<RwLock<BiMap<u32, String>>>,
objects: Arc<RwLock<BiMap<u32, ObjectTerm>>>,
next_subject_id: Arc<RwLock<u32>>,
next_predicate_id: Arc<RwLock<u32>>,
next_object_id: Arc<RwLock<u32>>,
stats: Arc<RwLock<InternerStats>>,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum SubjectTerm {
NamedNode(String),
BlankNode(String),
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum ObjectTerm {
NamedNode(String),
BlankNode(String),
Literal {
value: String,
datatype: Option<String>,
language: Option<String>,
},
}
#[derive(Debug, Clone, Default)]
pub struct InternerStats {
pub subject_count: usize,
pub predicate_count: usize,
pub object_count: usize,
pub total_lookups: usize,
pub cache_hits: usize,
pub memory_bytes: usize,
}
impl InternerStats {
pub fn hit_ratio(&self) -> f64 {
if self.total_lookups == 0 {
0.0
} else {
self.cache_hits as f64 / self.total_lookups as f64
}
}
pub fn total_terms(&self) -> usize {
self.subject_count + self.predicate_count + self.object_count
}
}
impl TermInterner {
pub fn new() -> Self {
TermInterner {
subjects: Arc::new(RwLock::new(BiMap::new())),
predicates: Arc::new(RwLock::new(BiMap::new())),
objects: Arc::new(RwLock::new(BiMap::new())),
next_subject_id: Arc::new(RwLock::new(0)),
next_predicate_id: Arc::new(RwLock::new(0)),
next_object_id: Arc::new(RwLock::new(0)),
stats: Arc::new(RwLock::new(InternerStats::default())),
}
}
pub fn intern_subject(&self, subject: &Subject) -> u32 {
let term = match subject {
Subject::NamedNode(n) => SubjectTerm::NamedNode(n.as_str().to_string()),
Subject::BlankNode(b) => SubjectTerm::BlankNode(b.id().to_string()),
Subject::Variable(_) | Subject::QuotedTriple(_) => {
panic!("Variables and quoted triples cannot be interned in storage")
}
};
{
let subjects = self.subjects.read().expect("subjects lock poisoned");
if let Some(&id) = subjects.get_by_right(&term) {
let mut stats = self.stats.write().expect("stats lock poisoned");
stats.total_lookups += 1;
stats.cache_hits += 1;
return id;
}
}
let mut subjects = self.subjects.write().expect("subjects lock poisoned");
if let Some(&id) = subjects.get_by_right(&term) {
let mut stats = self.stats.write().expect("stats lock poisoned");
stats.total_lookups += 1;
stats.cache_hits += 1;
return id;
}
let mut next_id = self
.next_subject_id
.write()
.expect("next_subject_id lock poisoned");
let id = *next_id;
*next_id += 1;
subjects.insert(id, term.clone());
let mut stats = self.stats.write().expect("stats lock poisoned");
stats.total_lookups += 1;
stats.subject_count += 1;
stats.memory_bytes += estimate_subject_memory(&term);
id
}
pub fn intern_predicate(&self, predicate: &Predicate) -> u32 {
let iri = match predicate {
Predicate::NamedNode(n) => n.as_str().to_string(),
Predicate::Variable(_) => {
panic!("Variables cannot be interned as predicates in storage")
}
};
{
let predicates = self.predicates.read().expect("predicates lock poisoned");
if let Some(&id) = predicates.get_by_right(&iri) {
let mut stats = self.stats.write().expect("stats lock poisoned");
stats.total_lookups += 1;
stats.cache_hits += 1;
return id;
}
}
let mut predicates = self.predicates.write().expect("predicates lock poisoned");
if let Some(&id) = predicates.get_by_right(&iri) {
let mut stats = self.stats.write().expect("stats lock poisoned");
stats.total_lookups += 1;
stats.cache_hits += 1;
return id;
}
let mut next_id = self
.next_predicate_id
.write()
.expect("next_predicate_id lock poisoned");
let id = *next_id;
*next_id += 1;
predicates.insert(id, iri.clone());
let mut stats = self.stats.write().expect("stats lock poisoned");
stats.total_lookups += 1;
stats.predicate_count += 1;
stats.memory_bytes += iri.len() + 4;
id
}
pub fn intern_object(&self, object: &Object) -> u32 {
let term = match object {
Object::NamedNode(n) => ObjectTerm::NamedNode(n.as_str().to_string()),
Object::BlankNode(b) => ObjectTerm::BlankNode(b.id().to_string()),
Object::Literal(l) => ObjectTerm::Literal {
value: l.value().to_string(),
datatype: Some(l.datatype().as_str().to_string()),
language: l.language().map(|lang| lang.to_string()),
},
Object::Variable(_) | Object::QuotedTriple(_) => {
panic!("Variables and quoted triples cannot be interned in storage")
}
};
{
let objects = self.objects.read().expect("objects lock poisoned");
if let Some(&id) = objects.get_by_right(&term) {
let mut stats = self.stats.write().expect("stats lock poisoned");
stats.total_lookups += 1;
stats.cache_hits += 1;
return id;
}
}
let mut objects = self.objects.write().expect("objects lock poisoned");
if let Some(&id) = objects.get_by_right(&term) {
let mut stats = self.stats.write().expect("stats lock poisoned");
stats.total_lookups += 1;
stats.cache_hits += 1;
return id;
}
let mut next_id = self
.next_object_id
.write()
.expect("next_object_id lock poisoned");
let id = *next_id;
*next_id += 1;
objects.insert(id, term.clone());
let mut stats = self.stats.write().expect("stats lock poisoned");
stats.total_lookups += 1;
stats.object_count += 1;
stats.memory_bytes += estimate_object_memory(&term);
id
}
pub fn get_subject(&self, id: u32) -> Option<Subject> {
let subjects = self.subjects.read().expect("subjects lock poisoned");
subjects.get_by_left(&id).map(|term| match term {
SubjectTerm::NamedNode(iri) => {
Subject::NamedNode(NamedNode::new(iri).expect("interned IRI should be valid"))
}
SubjectTerm::BlankNode(blank_id) => Subject::BlankNode(
BlankNode::new(blank_id).expect("interned blank node ID should be valid"),
),
})
}
pub fn get_subject_id(&self, subject: &Subject) -> Option<u32> {
let term = match subject {
Subject::NamedNode(n) => SubjectTerm::NamedNode(n.as_str().to_string()),
Subject::BlankNode(b) => SubjectTerm::BlankNode(b.id().to_string()),
Subject::Variable(_) | Subject::QuotedTriple(_) => {
panic!("Variables and quoted triples cannot be interned in storage")
}
};
let subjects = self.subjects.read().expect("subjects lock poisoned");
subjects.get_by_right(&term).copied()
}
pub fn get_predicate(&self, id: u32) -> Option<Predicate> {
let predicates = self.predicates.read().expect("predicates lock poisoned");
predicates.get_by_left(&id).map(|iri| {
Predicate::NamedNode(NamedNode::new(iri).expect("interned IRI should be valid"))
})
}
pub fn get_predicate_id(&self, predicate: &Predicate) -> Option<u32> {
let iri = match predicate {
Predicate::NamedNode(n) => n.as_str().to_string(),
Predicate::Variable(_) => {
panic!("Variables cannot be interned as predicates in storage")
}
};
let predicates = self.predicates.read().expect("predicates lock poisoned");
predicates.get_by_right(&iri).copied()
}
pub fn get_object(&self, id: u32) -> Option<Object> {
let objects = self.objects.read().expect("objects lock poisoned");
objects.get_by_left(&id).map(|term| match term {
ObjectTerm::NamedNode(iri) => {
Object::NamedNode(NamedNode::new(iri).expect("interned IRI should be valid"))
}
ObjectTerm::BlankNode(id) => Object::BlankNode(
BlankNode::new(id).expect("interned blank node ID should be valid"),
),
ObjectTerm::Literal {
value,
datatype,
language,
} => {
let literal = if let Some(lang) = language {
Literal::new_language_tagged_literal(value, lang)
.expect("interned language tag should be valid")
} else if let Some(dt) = datatype {
Literal::new_typed(
value,
NamedNode::new(dt).expect("interned datatype IRI should be valid"),
)
} else {
Literal::new(value)
};
Object::Literal(literal)
}
})
}
pub fn get_object_id(&self, object: &Object) -> Option<u32> {
let term = match object {
Object::NamedNode(n) => ObjectTerm::NamedNode(n.as_str().to_string()),
Object::BlankNode(b) => ObjectTerm::BlankNode(b.id().to_string()),
Object::Literal(l) => ObjectTerm::Literal {
value: l.value().to_string(),
datatype: Some(l.datatype().as_str().to_string()),
language: l.language().map(|lang| lang.to_string()),
},
Object::Variable(_) | Object::QuotedTriple(_) => {
panic!("Variables and quoted triples cannot be interned in storage")
}
};
let objects = self.objects.read().expect("objects lock poisoned");
objects.get_by_right(&term).copied()
}
pub fn stats(&self) -> InternerStats {
self.stats.read().expect("stats lock poisoned").clone()
}
pub fn clear(&self) {
let mut subjects = self.subjects.write().expect("subjects lock poisoned");
let mut predicates = self.predicates.write().expect("predicates lock poisoned");
let mut objects = self.objects.write().expect("objects lock poisoned");
subjects.clear();
predicates.clear();
objects.clear();
*self
.next_subject_id
.write()
.expect("next_subject_id lock poisoned") = 0;
*self
.next_predicate_id
.write()
.expect("next_predicate_id lock poisoned") = 0;
*self
.next_object_id
.write()
.expect("next_object_id lock poisoned") = 0;
let mut stats = self.stats.write().expect("stats lock poisoned");
*stats = InternerStats::default();
}
pub fn memory_usage(&self) -> usize {
self.stats.read().expect("stats lock poisoned").memory_bytes
}
pub fn intern_named_node(&self, node: &NamedNode) -> u64 {
self.intern_subject(&Subject::NamedNode(node.clone())) as u64
}
pub fn intern_blank_node(&self, node: &BlankNode) -> u64 {
self.intern_subject(&Subject::BlankNode(node.clone())) as u64
}
pub fn intern_literal(&self, literal: &Literal) -> u64 {
self.intern_object(&Object::Literal(literal.clone())) as u64
}
pub fn get_named_node_id(&self, node: &NamedNode) -> Option<u64> {
self.get_subject_id(&Subject::NamedNode(node.clone()))
.map(|id| id as u64)
}
pub fn get_blank_node_id(&self, node: &BlankNode) -> Option<u64> {
self.get_subject_id(&Subject::BlankNode(node.clone()))
.map(|id| id as u64)
}
pub fn get_literal_id(&self, literal: &Literal) -> Option<u64> {
self.get_object_id(&Object::Literal(literal.clone()))
.map(|id| id as u64)
}
pub fn save<P: AsRef<Path>>(&self, path: P) -> Result<()> {
let file = OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(path)
.context("Failed to create terms file")?;
let mut writer = BufWriter::new(file);
{
let subjects = self.subjects.read().expect("subjects lock poisoned");
let subject_data: Vec<(u32, SubjectTerm)> = subjects
.iter()
.map(|(id, term)| (*id, term.clone()))
.collect();
oxicode::serde::encode_into_std_write(
&subject_data,
&mut writer,
oxicode::config::standard(),
)
.map_err(|e| anyhow::anyhow!("Failed to serialize subjects: {}", e))?;
}
{
let predicates = self.predicates.read().expect("predicates lock poisoned");
let predicate_data: Vec<(u32, String)> = predicates
.iter()
.map(|(id, iri)| (*id, iri.clone()))
.collect();
oxicode::serde::encode_into_std_write(
&predicate_data,
&mut writer,
oxicode::config::standard(),
)
.map_err(|e| anyhow::anyhow!("Failed to serialize predicates: {}", e))?;
}
{
let objects = self.objects.read().expect("objects lock poisoned");
let object_data: Vec<(u32, ObjectTerm)> = objects
.iter()
.map(|(id, term)| (*id, term.clone()))
.collect();
oxicode::serde::encode_into_std_write(
&object_data,
&mut writer,
oxicode::config::standard(),
)
.map_err(|e| anyhow::anyhow!("Failed to serialize objects: {}", e))?;
}
let next_subject_id = *self
.next_subject_id
.read()
.expect("next_subject_id lock poisoned");
let next_predicate_id = *self
.next_predicate_id
.read()
.expect("next_predicate_id lock poisoned");
let next_object_id = *self
.next_object_id
.read()
.expect("next_object_id lock poisoned");
oxicode::serde::encode_into_std_write(
&next_subject_id,
&mut writer,
oxicode::config::standard(),
)?;
oxicode::serde::encode_into_std_write(
&next_predicate_id,
&mut writer,
oxicode::config::standard(),
)?;
oxicode::serde::encode_into_std_write(
&next_object_id,
&mut writer,
oxicode::config::standard(),
)?;
writer.flush()?;
Ok(())
}
pub fn load<P: AsRef<Path>>(path: P) -> Result<Self> {
let file = File::open(path).context("Failed to open terms file")?;
let mut reader = BufReader::new(file);
let (subject_data, _): (Vec<(u32, SubjectTerm)>, _) =
oxicode::serde::decode_from_std_read(&mut reader, oxicode::config::standard())
.map_err(|e| anyhow::anyhow!("Failed to deserialize subjects: {}", e))?;
let mut subjects = BiMap::new();
for (id, term) in subject_data {
subjects.insert(id, term);
}
let (predicate_data, _): (Vec<(u32, String)>, _) =
oxicode::serde::decode_from_std_read(&mut reader, oxicode::config::standard())
.map_err(|e| anyhow::anyhow!("Failed to deserialize predicates: {}", e))?;
let mut predicates = BiMap::new();
for (id, iri) in predicate_data {
predicates.insert(id, iri);
}
let (object_data, _): (Vec<(u32, ObjectTerm)>, _) =
oxicode::serde::decode_from_std_read(&mut reader, oxicode::config::standard())
.map_err(|e| anyhow::anyhow!("Failed to deserialize objects: {}", e))?;
let mut objects = BiMap::new();
for (id, term) in object_data {
objects.insert(id, term);
}
let (next_subject_id, _): (u32, _) =
oxicode::serde::decode_from_std_read(&mut reader, oxicode::config::standard())?;
let (next_predicate_id, _): (u32, _) =
oxicode::serde::decode_from_std_read(&mut reader, oxicode::config::standard())?;
let (next_object_id, _): (u32, _) =
oxicode::serde::decode_from_std_read(&mut reader, oxicode::config::standard())?;
let stats = InternerStats {
subject_count: subjects.len(),
predicate_count: predicates.len(),
object_count: objects.len(),
..Default::default()
};
Ok(TermInterner {
subjects: Arc::new(RwLock::new(subjects)),
predicates: Arc::new(RwLock::new(predicates)),
objects: Arc::new(RwLock::new(objects)),
next_subject_id: Arc::new(RwLock::new(next_subject_id)),
next_predicate_id: Arc::new(RwLock::new(next_predicate_id)),
next_object_id: Arc::new(RwLock::new(next_object_id)),
stats: Arc::new(RwLock::new(stats)),
})
}
}
impl Default for TermInterner {
fn default() -> Self {
Self::new()
}
}
fn estimate_subject_memory(term: &SubjectTerm) -> usize {
match term {
SubjectTerm::NamedNode(iri) => iri.len() + 4 + 8, SubjectTerm::BlankNode(id) => id.len() + 4 + 8,
}
}
fn estimate_object_memory(term: &ObjectTerm) -> usize {
match term {
ObjectTerm::NamedNode(iri) => iri.len() + 4 + 8,
ObjectTerm::BlankNode(id) => id.len() + 4 + 8,
ObjectTerm::Literal {
value,
datatype,
language,
} => {
value.len()
+ datatype.as_ref().map_or(0, |s| s.len())
+ language.as_ref().map_or(0, |s| s.len())
+ 4
+ 24 }
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_subject_interning() {
let interner = TermInterner::new();
let subject1 =
Subject::NamedNode(NamedNode::new("http://example.org/s1").expect("valid IRI"));
let subject2 = Subject::BlankNode(BlankNode::new("b1").expect("valid blank node id"));
let subject3 =
Subject::NamedNode(NamedNode::new("http://example.org/s1").expect("valid IRI"));
let id1 = interner.intern_subject(&subject1);
let id2 = interner.intern_subject(&subject2);
let id3 = interner.intern_subject(&subject3);
assert_eq!(id1, id3);
assert_ne!(id1, id2);
assert_eq!(interner.get_subject(id1), Some(subject1.clone()));
assert_eq!(interner.get_subject(id2), Some(subject2));
let stats = interner.stats();
assert_eq!(stats.subject_count, 2);
assert_eq!(stats.cache_hits, 1); }
#[test]
fn test_predicate_interning() {
let interner = TermInterner::new();
let pred1 =
Predicate::NamedNode(NamedNode::new("http://example.org/p1").expect("valid IRI"));
let pred2 =
Predicate::NamedNode(NamedNode::new("http://example.org/p2").expect("valid IRI"));
let pred3 =
Predicate::NamedNode(NamedNode::new("http://example.org/p1").expect("valid IRI"));
let id1 = interner.intern_predicate(&pred1);
let id2 = interner.intern_predicate(&pred2);
let id3 = interner.intern_predicate(&pred3);
assert_eq!(id1, id3);
assert_ne!(id1, id2);
assert_eq!(interner.get_predicate(id1), Some(pred1));
assert_eq!(interner.get_predicate(id2), Some(pred2));
}
#[test]
fn test_object_interning() {
let interner = TermInterner::new();
let obj1 = Object::Literal(Literal::new("test"));
let obj2 = Object::NamedNode(NamedNode::new("http://example.org/o1").expect("valid IRI"));
let obj3 = Object::Literal(Literal::new("test"));
let id1 = interner.intern_object(&obj1);
let id2 = interner.intern_object(&obj2);
let id3 = interner.intern_object(&obj3);
assert_eq!(id1, id3);
assert_ne!(id1, id2);
assert_eq!(interner.get_object(id1), Some(obj1));
assert_eq!(interner.get_object(id2), Some(obj2));
}
#[test]
fn test_clear() {
let interner = TermInterner::new();
let subject =
Subject::NamedNode(NamedNode::new("http://example.org/s1").expect("valid IRI"));
let id = interner.intern_subject(&subject);
assert!(interner.get_subject(id).is_some());
interner.clear();
assert!(interner.get_subject(id).is_none());
assert_eq!(interner.stats().total_terms(), 0);
}
#[test]
fn test_concurrent_access() {
use std::sync::Arc;
use std::thread;
let interner = Arc::new(TermInterner::new());
let mut handles = vec![];
for i in 0..10 {
let interner_clone = Arc::clone(&interner);
let handle = thread::spawn(move || {
let subject = Subject::NamedNode(
NamedNode::new(format!("http://example.org/s{}", i % 3))
.expect("valid IRI from format"),
);
interner_clone.intern_subject(&subject)
});
handles.push(handle);
}
let ids: Vec<u32> = handles
.into_iter()
.map(|h| h.join().expect("thread should not panic"))
.collect();
let unique_ids: std::collections::HashSet<_> = ids.iter().collect();
assert!(unique_ids.len() <= 3);
}
}