use crate::interning::{InternedString, StringInterner};
use crate::model::*;
use bumpalo::Bump;
use crossbeam::epoch::{self, Atomic, Owned};
use crossbeam::queue::SegQueue;
use dashmap::DashMap;
use parking_lot::RwLock;
#[cfg(feature = "parallel")]
use rayon::iter::IntoParallelRefIterator;
#[cfg(feature = "parallel")]
use rayon::iter::ParallelIterator;
use simd_json;
use std::collections::{BTreeSet, HashMap};
use std::pin::Pin;
use std::sync::atomic::Ordering;
use std::sync::Arc;
pub type TermInterner = StringInterner;
pub trait TermInternerExt {
fn intern_named_node(&self, iri: &str) -> Result<NamedNode, crate::OxirsError>;
fn intern_blank_node(&self) -> BlankNode;
fn intern_literal(&self, value: &str) -> Result<Literal, crate::OxirsError>;
fn intern_literal_with_datatype(
&self,
value: &str,
datatype_iri: &str,
) -> Result<Literal, crate::OxirsError>;
fn intern_literal_with_language(
&self,
value: &str,
language: &str,
) -> Result<Literal, crate::OxirsError>;
}
impl TermInternerExt for TermInterner {
fn intern_named_node(&self, iri: &str) -> Result<NamedNode, crate::OxirsError> {
let interned = self.intern(iri);
NamedNode::new(interned.as_ref())
}
fn intern_blank_node(&self) -> BlankNode {
BlankNode::new_unique()
}
fn intern_literal(&self, value: &str) -> Result<Literal, crate::OxirsError> {
let interned = self.intern(value);
Ok(Literal::new_simple_literal(interned.as_ref()))
}
fn intern_literal_with_datatype(
&self,
value: &str,
datatype_iri: &str,
) -> Result<Literal, crate::OxirsError> {
let value_interned = self.intern(value);
let datatype_interned = self.intern(datatype_iri);
let datatype_node = NamedNode::new(datatype_interned.as_ref())?;
Ok(Literal::new_typed_literal(
value_interned.as_ref(),
datatype_node,
))
}
fn intern_literal_with_language(
&self,
value: &str,
language: &str,
) -> Result<Literal, crate::OxirsError> {
let value_interned = self.intern(value);
let language_interned = self.intern(language);
let literal = Literal::new_language_tagged_literal(
value_interned.as_ref(),
language_interned.as_ref(),
)?;
Ok(literal)
}
}
#[derive(Debug)]
pub struct RdfArena {
arena: std::sync::Mutex<Bump>,
interner: StringInterner,
allocated_bytes: std::sync::atomic::AtomicUsize,
allocation_count: std::sync::atomic::AtomicUsize,
}
impl RdfArena {
pub fn new() -> Self {
RdfArena {
arena: std::sync::Mutex::new(Bump::new()),
interner: StringInterner::new(),
allocated_bytes: std::sync::atomic::AtomicUsize::new(0),
allocation_count: std::sync::atomic::AtomicUsize::new(0),
}
}
pub fn with_capacity(capacity: usize) -> Self {
RdfArena {
arena: std::sync::Mutex::new(Bump::with_capacity(capacity)),
interner: StringInterner::new(),
allocated_bytes: std::sync::atomic::AtomicUsize::new(0),
allocation_count: std::sync::atomic::AtomicUsize::new(0),
}
}
pub fn alloc_str(&self, s: &str) -> String {
self.allocated_bytes
.fetch_add(s.len(), std::sync::atomic::Ordering::Relaxed);
self.allocation_count
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
s.to_string()
}
pub fn intern_str(&self, s: &str) -> InternedString {
InternedString::new_with_interner(s, &self.interner)
}
pub fn reset(&self) {
if let Ok(mut arena) = self.arena.lock() {
arena.reset();
self.allocated_bytes
.store(0, std::sync::atomic::Ordering::Relaxed);
self.allocation_count
.store(0, std::sync::atomic::Ordering::Relaxed);
}
}
pub fn allocated_bytes(&self) -> usize {
self.allocated_bytes
.load(std::sync::atomic::Ordering::Relaxed)
}
pub fn allocation_count(&self) -> usize {
self.allocation_count
.load(std::sync::atomic::Ordering::Relaxed)
}
}
impl Default for RdfArena {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum TermRef<'a> {
NamedNode(&'a str),
BlankNode(&'a str),
Literal(&'a str, Option<&'a str>, Option<&'a str>), Variable(&'a str),
}
impl<'a> TermRef<'a> {
pub fn from_named_node(node: &'a NamedNode) -> Self {
TermRef::NamedNode(node.as_str())
}
pub fn from_blank_node(node: &'a BlankNode) -> Self {
TermRef::BlankNode(node.as_str())
}
pub fn from_literal(literal: &'a Literal) -> Self {
let language = literal.language();
TermRef::Literal(literal.value(), None, language)
}
pub fn as_str(&self) -> &'a str {
match self {
TermRef::NamedNode(s) => s,
TermRef::BlankNode(s) => s,
TermRef::Literal(s, _, _) => s,
TermRef::Variable(s) => s,
}
}
pub fn to_owned(&self) -> Result<Term, crate::OxirsError> {
match self {
TermRef::NamedNode(iri) => NamedNode::new(*iri).map(Term::NamedNode),
TermRef::BlankNode(id) => BlankNode::new(*id).map(Term::BlankNode),
TermRef::Literal(value, datatype, language) => {
let literal = if let Some(lang) = language {
Literal::new_lang(*value, *lang)?
} else if let Some(dt) = datatype {
let dt_node = NamedNode::new(*dt)?;
Literal::new_typed(*value, dt_node)
} else {
Literal::new(*value)
};
Ok(Term::Literal(literal))
}
TermRef::Variable(name) => Variable::new(*name).map(Term::Variable),
}
}
pub fn is_named_node(&self) -> bool {
matches!(self, TermRef::NamedNode(_))
}
pub fn is_blank_node(&self) -> bool {
matches!(self, TermRef::BlankNode(_))
}
pub fn is_literal(&self) -> bool {
matches!(self, TermRef::Literal(_, _, _))
}
pub fn is_variable(&self) -> bool {
matches!(self, TermRef::Variable(_))
}
}
impl<'a> std::fmt::Display for TermRef<'a> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
TermRef::NamedNode(iri) => write!(f, "<{iri}>"),
TermRef::BlankNode(id) => write!(f, "{id}"),
TermRef::Literal(value, datatype, language) => {
write!(f, "\"{value}\"")?;
if let Some(lang) = language {
write!(f, "@{lang}")?;
} else if let Some(dt) = datatype {
write!(f, "^^<{dt}>")?;
}
Ok(())
}
TermRef::Variable(name) => write!(f, "?{name}"),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct TripleRef<'a> {
pub subject: TermRef<'a>,
pub predicate: TermRef<'a>,
pub object: TermRef<'a>,
}
impl<'a> TripleRef<'a> {
pub fn new(subject: TermRef<'a>, predicate: TermRef<'a>, object: TermRef<'a>) -> Self {
TripleRef {
subject,
predicate,
object,
}
}
pub fn from_triple(triple: &'a Triple) -> Self {
TripleRef {
subject: match triple.subject() {
Subject::NamedNode(n) => TermRef::NamedNode(n.as_str()),
Subject::BlankNode(b) => TermRef::BlankNode(b.as_str()),
Subject::Variable(v) => TermRef::Variable(v.as_str()),
Subject::QuotedTriple(_) => TermRef::NamedNode("<<quoted-triple>>"),
},
predicate: match triple.predicate() {
Predicate::NamedNode(n) => TermRef::NamedNode(n.as_str()),
Predicate::Variable(v) => TermRef::Variable(v.as_str()),
},
object: match triple.object() {
Object::NamedNode(n) => TermRef::NamedNode(n.as_str()),
Object::BlankNode(b) => TermRef::BlankNode(b.as_str()),
Object::Literal(l) => TermRef::from_literal(l),
Object::Variable(v) => TermRef::Variable(v.as_str()),
Object::QuotedTriple(_) => TermRef::NamedNode("<<quoted-triple>>"),
},
}
}
pub fn to_owned(&self) -> Result<Triple, crate::OxirsError> {
let subject = match self.subject.to_owned()? {
Term::NamedNode(n) => Subject::NamedNode(n),
Term::BlankNode(b) => Subject::BlankNode(b),
_ => return Err(crate::OxirsError::Parse("Invalid subject term".to_string())),
};
let predicate = match self.predicate.to_owned()? {
Term::NamedNode(n) => Predicate::NamedNode(n),
_ => {
return Err(crate::OxirsError::Parse(
"Invalid predicate term".to_string(),
))
}
};
let object = match self.object.to_owned()? {
Term::NamedNode(n) => Object::NamedNode(n),
Term::BlankNode(b) => Object::BlankNode(b),
Term::Literal(l) => Object::Literal(l),
_ => return Err(crate::OxirsError::Parse("Invalid object term".to_string())),
};
Ok(Triple::new(subject, predicate, object))
}
}
impl<'a> std::fmt::Display for TripleRef<'a> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{} {} {} .", self.subject, self.predicate, self.object)
}
}
#[derive(Debug)]
pub struct LockFreeGraph {
data: Atomic<GraphData>,
epoch: epoch::Guard,
}
#[derive(Debug)]
struct GraphData {
triples: BTreeSet<Triple>,
version: u64,
}
impl LockFreeGraph {
pub fn new() -> Self {
let initial_data = GraphData {
triples: BTreeSet::new(),
version: 0,
};
LockFreeGraph {
data: Atomic::new(initial_data),
epoch: epoch::pin(),
}
}
pub fn insert(&self, triple: Triple) -> bool {
loop {
let current = self.data.load(Ordering::Acquire, &self.epoch);
let current_ref = unsafe { current.deref() };
if current_ref.triples.contains(&triple) {
return false;
}
let mut new_triples = current_ref.triples.clone();
new_triples.insert(triple.clone());
let new_data = GraphData {
triples: new_triples,
version: current_ref.version + 1,
};
match self.data.compare_exchange_weak(
current,
Owned::new(new_data),
Ordering::Release,
Ordering::Relaxed,
&self.epoch,
) {
Ok(_) => {
unsafe {
self.epoch.defer_destroy(current);
}
return true;
}
Err(_) => {
continue;
}
}
}
}
pub fn len(&self) -> usize {
let current = self.data.load(Ordering::Acquire, &self.epoch);
unsafe { current.deref().triples.len() }
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn contains(&self, triple: &Triple) -> bool {
let current = self.data.load(Ordering::Acquire, &self.epoch);
unsafe { current.deref().triples.contains(triple) }
}
}
impl Default for LockFreeGraph {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug)]
pub struct OptimizedGraph {
spo: DashMap<InternedString, DashMap<InternedString, BTreeSet<InternedString>>>,
pos: DashMap<InternedString, DashMap<InternedString, BTreeSet<InternedString>>>,
osp: DashMap<InternedString, DashMap<InternedString, BTreeSet<InternedString>>>,
interner: Arc<StringInterner>,
stats: Arc<RwLock<GraphStats>>,
}
#[derive(Debug, Clone, Default)]
pub struct GraphStats {
pub triple_count: usize,
pub unique_subjects: usize,
pub unique_predicates: usize,
pub unique_objects: usize,
pub index_memory_usage: usize,
pub intern_hit_ratio: f64,
}
impl OptimizedGraph {
pub fn new() -> Self {
OptimizedGraph {
spo: DashMap::new(),
pos: DashMap::new(),
osp: DashMap::new(),
interner: Arc::new(StringInterner::new()),
stats: Arc::new(RwLock::new(GraphStats::default())),
}
}
pub fn insert(&self, triple: &Triple) -> bool {
let subject = self.intern_subject(triple.subject());
let predicate = self.intern_predicate(triple.predicate());
let object = self.intern_object(triple.object());
let spo_entry = self.spo.entry(subject.clone()).or_default();
let mut po_entry = spo_entry.entry(predicate.clone()).or_default();
let was_new = po_entry.insert(object.clone());
if was_new {
let pos_entry = self.pos.entry(predicate.clone()).or_default();
let mut os_entry = pos_entry.entry(object.clone()).or_default();
os_entry.insert(subject.clone());
let osp_entry = self.osp.entry(object.clone()).or_default();
let mut sp_entry = osp_entry.entry(subject.clone()).or_default();
sp_entry.insert(predicate);
{
let mut stats = self.stats.write();
stats.triple_count += 1;
stats.intern_hit_ratio = self.interner.stats().hit_ratio();
}
}
was_new
}
pub fn query(
&self,
subject: Option<&Subject>,
predicate: Option<&Predicate>,
object: Option<&Object>,
) -> Vec<Triple> {
let mut results = Vec::new();
match (subject.is_some(), predicate.is_some(), object.is_some()) {
(true, true, true) => {
if let (Some(s), Some(p), Some(o)) = (subject, predicate, object) {
let s_intern = self.intern_subject(s);
let p_intern = self.intern_predicate(p);
let o_intern = self.intern_object(o);
if let Some(po_map) = self.spo.get(&s_intern) {
if let Some(o_set) = po_map.get(&p_intern) {
if o_set.contains(&o_intern) {
let triple = Triple::new(s.clone(), p.clone(), o.clone());
results.push(triple);
}
}
}
}
}
(true, true, false) => {
if let (Some(s), Some(p)) = (subject, predicate) {
let s_intern = self.intern_subject(s);
let p_intern = self.intern_predicate(p);
if let Some(po_map) = self.spo.get(&s_intern) {
if let Some(o_set) = po_map.get(&p_intern) {
for o_intern in o_set.iter() {
if let Ok(object) = self.unintern_object(o_intern) {
let triple = Triple::new(s.clone(), p.clone(), object);
results.push(triple);
}
}
}
}
}
}
(false, true, true) => {
if let (Some(p), Some(o)) = (predicate, object) {
let p_intern = self.intern_predicate(p);
let o_intern = self.intern_object(o);
if let Some(os_map) = self.pos.get(&p_intern) {
if let Some(s_set) = os_map.get(&o_intern) {
for s_intern in s_set.iter() {
if let Ok(subject) = self.unintern_subject(s_intern) {
let triple = Triple::new(subject, p.clone(), o.clone());
results.push(triple);
}
}
}
}
}
}
_ => {
for s_entry in &self.spo {
let s_intern = s_entry.key();
if let Ok(s) = self.unintern_subject(s_intern) {
if let Some(subj) = subject {
if subj != &s {
continue;
}
}
for po_entry in s_entry.value().iter() {
let p_intern = po_entry.key();
if let Ok(p) = self.unintern_predicate(p_intern) {
if let Some(pred) = predicate {
if pred != &p {
continue;
}
}
for o_intern in po_entry.value().iter() {
if let Ok(o) = self.unintern_object(o_intern) {
if let Some(obj) = object {
if obj != &o {
continue;
}
}
let triple = Triple::new(s.clone(), p.clone(), o);
results.push(triple);
}
}
}
}
}
}
}
}
results
}
pub fn stats(&self) -> GraphStats {
self.stats.read().clone()
}
fn intern_subject(&self, subject: &Subject) -> InternedString {
match subject {
Subject::NamedNode(n) => InternedString::new_with_interner(n.as_str(), &self.interner),
Subject::BlankNode(b) => InternedString::new_with_interner(b.as_str(), &self.interner),
Subject::Variable(v) => InternedString::new_with_interner(v.as_str(), &self.interner),
Subject::QuotedTriple(_) => {
InternedString::new_with_interner("<<quoted-triple>>", &self.interner)
}
}
}
fn intern_predicate(&self, predicate: &Predicate) -> InternedString {
match predicate {
Predicate::NamedNode(n) => {
InternedString::new_with_interner(n.as_str(), &self.interner)
}
Predicate::Variable(v) => InternedString::new_with_interner(v.as_str(), &self.interner),
}
}
fn intern_object(&self, object: &Object) -> InternedString {
match object {
Object::NamedNode(n) => InternedString::new_with_interner(n.as_str(), &self.interner),
Object::BlankNode(b) => InternedString::new_with_interner(b.as_str(), &self.interner),
Object::Literal(l) => {
let serialized = format!("{l}");
InternedString::new_with_interner(&serialized, &self.interner)
}
Object::Variable(v) => InternedString::new_with_interner(v.as_str(), &self.interner),
Object::QuotedTriple(_) => {
InternedString::new_with_interner("<<quoted-triple>>", &self.interner)
}
}
}
fn unintern_subject(&self, interned: &InternedString) -> Result<Subject, crate::OxirsError> {
let s = interned.as_str();
if s.starts_with("?") || s.starts_with("$") {
Variable::new(&s[1..]).map(Subject::Variable)
} else if s.starts_with("_:") {
BlankNode::new(s).map(Subject::BlankNode)
} else {
NamedNode::new(s).map(Subject::NamedNode)
}
}
fn unintern_predicate(
&self,
interned: &InternedString,
) -> Result<Predicate, crate::OxirsError> {
let s = interned.as_str();
if s.starts_with("?") || s.starts_with("$") {
Variable::new(&s[1..]).map(Predicate::Variable)
} else {
NamedNode::new(s).map(Predicate::NamedNode)
}
}
fn unintern_object(&self, interned: &InternedString) -> Result<Object, crate::OxirsError> {
let s = interned.as_str();
if s.starts_with("?") || s.starts_with("$") {
return Variable::new(&s[1..]).map(Object::Variable);
} else if let Some(stripped) = s.strip_prefix("\"") {
if let Some(end_quote) = stripped.find('"') {
let value = &stripped[..end_quote];
return Ok(Object::Literal(Literal::new(value)));
}
return Ok(Object::Literal(Literal::new(s)));
}
if s.starts_with("_:") {
BlankNode::new(s).map(Object::BlankNode)
} else {
NamedNode::new(s).map(Object::NamedNode)
}
}
}
impl Default for OptimizedGraph {
fn default() -> Self {
Self::new()
}
}
#[cfg(feature = "parallel")]
#[derive(Debug)]
pub struct BatchProcessor {
operation_queue: SegQueue<BatchOperation>,
processing_pool: rayon::ThreadPool,
stats: Arc<RwLock<BatchStats>>,
}
#[derive(Debug, Clone)]
pub enum BatchOperation {
Insert(Quad),
Delete(Quad),
Update { old: Quad, new: Quad },
Compact,
}
#[derive(Debug, Default, Clone)]
pub struct BatchStats {
pub operations_processed: usize,
pub batch_size: usize,
pub processing_time_ms: u64,
pub throughput_ops_per_sec: f64,
}
#[cfg(feature = "parallel")]
impl BatchProcessor {
pub fn new(num_threads: usize) -> Self {
let pool = rayon::ThreadPoolBuilder::new()
.num_threads(num_threads)
.build()
.expect("thread pool builder should succeed");
BatchProcessor {
operation_queue: SegQueue::new(),
processing_pool: pool,
stats: Arc::new(RwLock::new(BatchStats::default())),
}
}
pub fn push(&self, operation: BatchOperation) {
self.operation_queue.push(operation);
}
pub fn process_batch(&self, batch_size: usize) -> Result<usize, crate::OxirsError> {
let start_time = std::time::Instant::now();
let mut operations = Vec::with_capacity(batch_size);
for _ in 0..batch_size {
if let Some(op) = self.operation_queue.pop() {
operations.push(op);
} else {
break;
}
}
if operations.is_empty() {
return Ok(0);
}
let operations_count = operations.len();
self.processing_pool.install(|| {
operations.par_iter().for_each(|operation| {
match operation {
BatchOperation::Insert(_quad) => {
}
BatchOperation::Delete(_quad) => {
}
BatchOperation::Update {
old: _old,
new: _new,
} => {
}
BatchOperation::Compact => {
}
}
});
});
let processing_time = start_time.elapsed();
{
let mut stats = self.stats.write();
stats.operations_processed += operations_count;
stats.batch_size = batch_size;
stats.processing_time_ms = processing_time.as_millis() as u64;
if processing_time.as_secs_f64() > 0.0 {
stats.throughput_ops_per_sec =
operations_count as f64 / processing_time.as_secs_f64();
}
}
Ok(operations_count)
}
pub fn stats(&self) -> BatchStats {
self.stats.read().clone()
}
pub fn pending_operations(&self) -> usize {
self.operation_queue.len()
}
}
#[cfg(feature = "parallel")]
impl Default for BatchProcessor {
fn default() -> Self {
Self::new(num_cpus::get())
}
}
pub mod simd {
#[cfg(feature = "simd")]
use wide::{u8x32, CmpEq};
#[cfg(feature = "simd")]
pub fn validate_iri_fast(iri: &str) -> bool {
if iri.is_empty() {
return false;
}
let bytes = iri.as_bytes();
let len = bytes.len();
let chunks = len / 32;
let _remainder = len % 32;
for i in 0..chunks {
let start = i * 32;
let chunk = &bytes[start..start + 32];
let data = u8x32::from([
chunk[0], chunk[1], chunk[2], chunk[3], chunk[4], chunk[5], chunk[6], chunk[7],
chunk[8], chunk[9], chunk[10], chunk[11], chunk[12], chunk[13], chunk[14],
chunk[15], chunk[16], chunk[17], chunk[18], chunk[19], chunk[20], chunk[21],
chunk[22], chunk[23], chunk[24], chunk[25], chunk[26], chunk[27], chunk[28],
chunk[29], chunk[30], chunk[31],
]);
let forbidden_chars = [b'<', b'>', b'"', b'{', b'}', b'|', b'\\', b'^', b'`', b' '];
for &forbidden in &forbidden_chars {
let forbidden_vec = u8x32::splat(forbidden);
let matches = data.simd_eq(forbidden_vec);
if matches.any() {
return false;
}
}
for &byte in chunk {
if matches!(byte, 0..=31 | 127..=159) {
return false;
}
}
}
for &byte in &bytes[chunks * 32..] {
if matches!(byte,
0..=31 | 127..=159 | b'<' | b'>' | b'"' | b'{' | b'}' | b'|' | b'\\' | b'^' | b'`' | b' ' ) {
return false;
}
}
true
}
#[cfg(not(feature = "simd"))]
pub fn validate_iri_fast(iri: &str) -> bool {
if iri.is_empty() {
return false;
}
for byte in iri.bytes() {
if matches!(
byte,
b'<' | b'>' | b'"' | b'{' | b'}' | b'|' | b'\\' | b'^' | b'`' | b' ' ) {
return false;
}
}
true
}
pub fn compare_strings_fast(a: &str, b: &str) -> std::cmp::Ordering {
if a.len() != b.len() {
return a.len().cmp(&b.len());
}
let a_bytes = a.as_bytes();
let b_bytes = b.as_bytes();
let len = a_bytes.len();
let chunks = len / 32;
for i in 0..chunks {
let start = i * 32;
let a_chunk = &a_bytes[start..start + 32];
let b_chunk = &b_bytes[start..start + 32];
for j in 0..32 {
match a_chunk[j].cmp(&b_chunk[j]) {
std::cmp::Ordering::Equal => continue,
other => return other,
}
}
}
for i in chunks * 32..len {
match a_bytes[i].cmp(&b_bytes[i]) {
std::cmp::Ordering::Equal => continue,
other => return other,
}
}
std::cmp::Ordering::Equal
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_rdf_arena() {
let arena = RdfArena::new();
let s1 = arena.alloc_str("test string 1");
let s2 = arena.alloc_str("test string 2");
assert_eq!(s1, "test string 1");
assert_eq!(s2, "test string 2");
assert!(arena.allocated_bytes() > 0);
assert_eq!(arena.allocation_count(), 2);
}
#[test]
fn test_term_ref() {
let node = NamedNode::new("http://example.org/test").expect("valid IRI");
let term_ref = TermRef::from_named_node(&node);
assert!(term_ref.is_named_node());
assert_eq!(term_ref.as_str(), "http://example.org/test");
let owned = term_ref.to_owned().expect("operation should succeed");
assert!(owned.is_named_node());
}
#[test]
fn test_triple_ref() {
let subject = NamedNode::new("http://example.org/s").expect("valid IRI");
let predicate = NamedNode::new("http://example.org/p").expect("valid IRI");
let object = Literal::new("test object");
let triple = Triple::new(subject, predicate, object);
let triple_ref = TripleRef::from_triple(&triple);
assert!(triple_ref.subject.is_named_node());
assert!(triple_ref.predicate.is_named_node());
assert!(triple_ref.object.is_literal());
let owned = triple_ref.to_owned().expect("operation should succeed");
assert_eq!(owned, triple);
}
#[test]
fn test_lock_free_graph() {
let graph = LockFreeGraph::new();
assert!(graph.is_empty());
let subject = NamedNode::new("http://example.org/s").expect("valid IRI");
let predicate = NamedNode::new("http://example.org/p").expect("valid IRI");
let object = Literal::new("test object");
let triple = Triple::new(subject, predicate, object);
assert!(graph.insert(triple.clone()));
assert!(!graph.insert(triple.clone())); assert_eq!(graph.len(), 1);
assert!(graph.contains(&triple));
}
#[test]
fn test_optimized_graph() {
let graph = OptimizedGraph::new();
let subject = NamedNode::new("http://example.org/s").expect("valid IRI");
let predicate = NamedNode::new("http://example.org/p").expect("valid IRI");
let object = Literal::new("test object");
let triple = Triple::new(subject.clone(), predicate.clone(), object.clone());
assert!(graph.insert(&triple));
assert!(!graph.insert(&triple));
let results = graph.query(
Some(&Subject::NamedNode(subject.clone())),
Some(&Predicate::NamedNode(predicate.clone())),
Some(&Object::Literal(object.clone())),
);
assert_eq!(results.len(), 1);
assert_eq!(results[0], triple);
let results = graph.query(Some(&Subject::NamedNode(subject)), None, None);
assert_eq!(results.len(), 1);
let stats = graph.stats();
assert_eq!(stats.triple_count, 1);
}
#[test]
fn test_simd_iri_validation() {
assert!(simd::validate_iri_fast("http://example.org/test"));
assert!(!simd::validate_iri_fast("http://example.org/<invalid>"));
assert!(!simd::validate_iri_fast(""));
assert!(!simd::validate_iri_fast(
"http://example.org/test with spaces"
));
}
#[test]
fn test_simd_string_comparison() {
assert_eq!(
simd::compare_strings_fast("abc", "abc"),
std::cmp::Ordering::Equal
);
assert_eq!(
simd::compare_strings_fast("abc", "def"),
std::cmp::Ordering::Less
);
assert_eq!(
simd::compare_strings_fast("def", "abc"),
std::cmp::Ordering::Greater
);
assert_eq!(
simd::compare_strings_fast("short", "longer"),
std::cmp::Ordering::Less
);
}
#[test]
fn test_arena_reset() {
let arena = RdfArena::new();
arena.alloc_str("test");
assert!(arena.allocated_bytes() > 0);
arena.reset();
assert_eq!(arena.allocated_bytes(), 0);
assert_eq!(arena.allocation_count(), 0);
}
#[test]
fn test_concurrent_optimized_graph() {
use std::sync::Arc;
use std::thread;
let graph = Arc::new(OptimizedGraph::new());
let handles: Vec<_> = (0..10)
.map(|i| {
let graph = Arc::clone(&graph);
thread::spawn(move || {
let subject = NamedNode::new(format!("http://example.org/s{i}"))
.expect("valid IRI from format");
let predicate = NamedNode::new("http://example.org/p").expect("valid IRI");
let object = Literal::new(format!("object{i}"));
let triple = Triple::new(subject, predicate, object);
graph.insert(&triple)
})
})
.collect();
let results: Vec<bool> = handles
.into_iter()
.map(|h| h.join().expect("thread should not panic"))
.collect();
assert!(results.iter().all(|&inserted| inserted));
let stats = graph.stats();
assert_eq!(stats.triple_count, 10);
}
}
pub struct ZeroCopyBuffer {
data: Pin<Box<[u8]>>,
len: usize,
}
impl ZeroCopyBuffer {
pub fn new(capacity: usize) -> Self {
Self::with_capacity(capacity)
}
pub fn with_capacity(capacity: usize) -> Self {
let vec = vec![0; capacity];
let data = vec.into_boxed_slice();
ZeroCopyBuffer {
data: Pin::new(data),
len: 0,
}
}
pub fn as_slice(&self) -> &[u8] {
&self.data[..self.len]
}
pub fn as_mut_slice(&mut self) -> &mut [u8] {
&mut self.data[..]
}
pub fn capacity(&self) -> usize {
self.data.len()
}
pub fn len(&self) -> usize {
self.len
}
pub fn is_empty(&self) -> bool {
self.len == 0
}
pub fn clear(&mut self) {
self.len = 0;
}
pub fn reset(&mut self) {
self.clear();
}
pub fn set_len(&mut self, len: usize) {
assert!(len <= self.capacity());
self.len = len;
}
pub fn write(&mut self, data: &[u8]) -> Result<usize, std::io::Error> {
let available = self.capacity() - self.len;
let to_write = data.len().min(available);
if to_write == 0 {
return Err(std::io::Error::new(
std::io::ErrorKind::WriteZero,
"Buffer is full",
));
}
unsafe {
let dst = self.data.as_mut_ptr().add(self.len);
std::ptr::copy_nonoverlapping(data.as_ptr(), dst, to_write);
}
self.len += to_write;
Ok(to_write)
}
}
#[derive(Clone)]
pub struct SimdJsonProcessor;
impl SimdJsonProcessor {
pub fn new() -> Self {
SimdJsonProcessor
}
pub fn parse<'a>(
&mut self,
json: &'a mut [u8],
) -> Result<simd_json::BorrowedValue<'a>, simd_json::Error> {
simd_json::to_borrowed_value(json)
}
pub fn parse_str<'a>(
&mut self,
json: &'a mut str,
) -> Result<simd_json::BorrowedValue<'a>, simd_json::Error> {
let bytes = unsafe { json.as_bytes_mut() };
simd_json::to_borrowed_value(bytes)
}
pub fn parse_owned(
&mut self,
json: &mut [u8],
) -> Result<simd_json::OwnedValue, simd_json::Error> {
simd_json::to_owned_value(json)
}
pub fn parse_json(&self, json: &[u8]) -> Result<serde_json::Value, serde_json::Error> {
serde_json::from_slice(json)
}
}
impl Default for SimdJsonProcessor {
fn default() -> Self {
Self::new()
}
}
#[derive(Clone, Debug)]
pub struct SimdXmlProcessor {
scan_buffer: Vec<u8>,
}
impl SimdXmlProcessor {
pub fn new() -> Self {
SimdXmlProcessor {
scan_buffer: Vec::with_capacity(4096),
}
}
#[cfg(target_arch = "x86_64")]
pub fn find_special_char(&self, data: &[u8]) -> Option<usize> {
use std::arch::x86_64::*;
const CHUNK_SIZE: usize = 16;
let mut offset = 0;
if data.len() >= CHUNK_SIZE {
unsafe {
let lt = _mm_set1_epi8(b'<' as i8);
let gt = _mm_set1_epi8(b'>' as i8);
let amp = _mm_set1_epi8(b'&' as i8);
let quot = _mm_set1_epi8(b'"' as i8);
let apos = _mm_set1_epi8(b'\'' as i8);
while offset + CHUNK_SIZE <= data.len() {
let chunk = _mm_loadu_si128(data.as_ptr().add(offset) as *const __m128i);
let eq_lt = _mm_cmpeq_epi8(chunk, lt);
let eq_gt = _mm_cmpeq_epi8(chunk, gt);
let eq_amp = _mm_cmpeq_epi8(chunk, amp);
let eq_quot = _mm_cmpeq_epi8(chunk, quot);
let eq_apos = _mm_cmpeq_epi8(chunk, apos);
let any_match = _mm_or_si128(
_mm_or_si128(_mm_or_si128(eq_lt, eq_gt), eq_amp),
_mm_or_si128(eq_quot, eq_apos),
);
let mask = _mm_movemask_epi8(any_match);
if mask != 0 {
return Some(offset + mask.trailing_zeros() as usize);
}
offset += CHUNK_SIZE;
}
}
}
data[offset..]
.iter()
.position(|&b| matches!(b, b'<' | b'>' | b'&' | b'"' | b'\''))
.map(|i| i + offset)
}
#[cfg(not(target_arch = "x86_64"))]
pub fn find_special_char(&self, data: &[u8]) -> Option<usize> {
data.iter()
.position(|&b| matches!(b, b'<' | b'>' | b'&' | b'"' | b'\''))
}
pub fn is_valid_utf8(&self, data: &[u8]) -> bool {
std::str::from_utf8(data).is_ok()
}
pub fn trim_whitespace<'a>(&self, data: &'a [u8]) -> &'a [u8] {
let start = data
.iter()
.position(|&b| !matches!(b, b' ' | b'\t' | b'\n' | b'\r'))
.unwrap_or(data.len());
let end = data
.iter()
.rposition(|&b| !matches!(b, b' ' | b'\t' | b'\n' | b'\r'))
.map(|i| i + 1)
.unwrap_or(0);
if start >= end {
&[]
} else {
&data[start..end]
}
}
#[cfg(target_arch = "x86_64")]
pub fn find_colon(&self, data: &[u8]) -> Option<usize> {
use std::arch::x86_64::*;
const CHUNK_SIZE: usize = 16;
let mut offset = 0;
if data.len() >= CHUNK_SIZE {
unsafe {
let colon = _mm_set1_epi8(b':' as i8);
while offset + CHUNK_SIZE <= data.len() {
let chunk = _mm_loadu_si128(data.as_ptr().add(offset) as *const __m128i);
let eq = _mm_cmpeq_epi8(chunk, colon);
let mask = _mm_movemask_epi8(eq);
if mask != 0 {
return Some(offset + mask.trailing_zeros() as usize);
}
offset += CHUNK_SIZE;
}
}
}
data[offset..]
.iter()
.position(|&b| b == b':')
.map(|i| i + offset)
}
#[cfg(not(target_arch = "x86_64"))]
pub fn find_colon(&self, data: &[u8]) -> Option<usize> {
data.iter().position(|&b| b == b':')
}
pub fn parse_qname<'a>(&self, qname: &'a [u8]) -> (&'a [u8], &'a [u8]) {
match self.find_colon(qname) {
Some(pos) => (&qname[..pos], &qname[pos + 1..]),
None => (&[], qname),
}
}
pub fn expand_name<'a>(
&self,
prefix: &'a [u8],
local: &'a [u8],
namespaces: &HashMap<String, String>,
) -> Option<String> {
let prefix_str = std::str::from_utf8(prefix).ok()?;
let local_str = std::str::from_utf8(local).ok()?;
namespaces
.get(prefix_str)
.map(|ns| format!("{}{}", ns, local_str))
}
pub fn ensure_buffer_capacity(&mut self, capacity: usize) {
if self.scan_buffer.capacity() < capacity {
self.scan_buffer
.reserve(capacity - self.scan_buffer.capacity());
}
}
}
impl Default for SimdXmlProcessor {
fn default() -> Self {
Self::new()
}
}