1use crate::interning::{InternedString, StringInterner};
8use crate::model::*;
9use bumpalo::Bump;
10use crossbeam::epoch::{self, Atomic, Owned};
11use crossbeam::queue::SegQueue;
12use dashmap::DashMap;
13use parking_lot::RwLock;
14#[cfg(feature = "parallel")]
15use rayon::iter::IntoParallelRefIterator;
16#[cfg(feature = "parallel")]
17use rayon::iter::ParallelIterator;
18use simd_json;
19use std::collections::{BTreeSet, HashMap};
20use std::pin::Pin;
21use std::sync::atomic::Ordering;
22use std::sync::Arc;
23
24pub type TermInterner = StringInterner;
26
27pub trait TermInternerExt {
29 fn intern_named_node(&self, iri: &str) -> Result<NamedNode, crate::OxirsError>;
31
32 fn intern_blank_node(&self) -> BlankNode;
34
35 fn intern_literal(&self, value: &str) -> Result<Literal, crate::OxirsError>;
37
38 fn intern_literal_with_datatype(
40 &self,
41 value: &str,
42 datatype_iri: &str,
43 ) -> Result<Literal, crate::OxirsError>;
44
45 fn intern_literal_with_language(
47 &self,
48 value: &str,
49 language: &str,
50 ) -> Result<Literal, crate::OxirsError>;
51}
52
53impl TermInternerExt for TermInterner {
54 fn intern_named_node(&self, iri: &str) -> Result<NamedNode, crate::OxirsError> {
55 let interned = self.intern(iri);
57 NamedNode::new(interned.as_ref())
59 }
60
61 fn intern_blank_node(&self) -> BlankNode {
62 BlankNode::new_unique()
64 }
65
66 fn intern_literal(&self, value: &str) -> Result<Literal, crate::OxirsError> {
67 let interned = self.intern(value);
69 Ok(Literal::new_simple_literal(interned.as_ref()))
71 }
72
73 fn intern_literal_with_datatype(
74 &self,
75 value: &str,
76 datatype_iri: &str,
77 ) -> Result<Literal, crate::OxirsError> {
78 let value_interned = self.intern(value);
80 let datatype_interned = self.intern(datatype_iri);
81 let datatype_node = NamedNode::new(datatype_interned.as_ref())?;
83 Ok(Literal::new_typed_literal(
84 value_interned.as_ref(),
85 datatype_node,
86 ))
87 }
88
89 fn intern_literal_with_language(
90 &self,
91 value: &str,
92 language: &str,
93 ) -> Result<Literal, crate::OxirsError> {
94 let value_interned = self.intern(value);
96 let language_interned = self.intern(language);
97 let literal = Literal::new_language_tagged_literal(
99 value_interned.as_ref(),
100 language_interned.as_ref(),
101 )?;
102 Ok(literal)
103 }
104}
105
106#[derive(Debug)]
110pub struct RdfArena {
111 arena: std::sync::Mutex<Bump>,
113 interner: StringInterner,
115 allocated_bytes: std::sync::atomic::AtomicUsize,
117 allocation_count: std::sync::atomic::AtomicUsize,
118}
119
120impl RdfArena {
121 pub fn new() -> Self {
123 RdfArena {
124 arena: std::sync::Mutex::new(Bump::new()),
125 interner: StringInterner::new(),
126 allocated_bytes: std::sync::atomic::AtomicUsize::new(0),
127 allocation_count: std::sync::atomic::AtomicUsize::new(0),
128 }
129 }
130
131 pub fn with_capacity(capacity: usize) -> Self {
133 RdfArena {
134 arena: std::sync::Mutex::new(Bump::with_capacity(capacity)),
135 interner: StringInterner::new(),
136 allocated_bytes: std::sync::atomic::AtomicUsize::new(0),
137 allocation_count: std::sync::atomic::AtomicUsize::new(0),
138 }
139 }
140
141 pub fn alloc_str(&self, s: &str) -> String {
143 self.allocated_bytes
146 .fetch_add(s.len(), std::sync::atomic::Ordering::Relaxed);
147 self.allocation_count
148 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
149 s.to_string()
150 }
151
152 pub fn intern_str(&self, s: &str) -> InternedString {
154 InternedString::new_with_interner(s, &self.interner)
155 }
156
157 pub fn reset(&self) {
159 if let Ok(mut arena) = self.arena.lock() {
160 arena.reset();
161 self.allocated_bytes
162 .store(0, std::sync::atomic::Ordering::Relaxed);
163 self.allocation_count
164 .store(0, std::sync::atomic::Ordering::Relaxed);
165 }
166 }
167
168 pub fn allocated_bytes(&self) -> usize {
170 self.allocated_bytes
171 .load(std::sync::atomic::Ordering::Relaxed)
172 }
173
174 pub fn allocation_count(&self) -> usize {
176 self.allocation_count
177 .load(std::sync::atomic::Ordering::Relaxed)
178 }
179}
180
181impl Default for RdfArena {
182 fn default() -> Self {
183 Self::new()
184 }
185}
186
187#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
191pub enum TermRef<'a> {
192 NamedNode(&'a str),
193 BlankNode(&'a str),
194 Literal(&'a str, Option<&'a str>, Option<&'a str>), Variable(&'a str),
196}
197
198impl<'a> TermRef<'a> {
199 pub fn from_named_node(node: &'a NamedNode) -> Self {
201 TermRef::NamedNode(node.as_str())
202 }
203
204 pub fn from_blank_node(node: &'a BlankNode) -> Self {
206 TermRef::BlankNode(node.as_str())
207 }
208
209 pub fn from_literal(literal: &'a Literal) -> Self {
211 let language = literal.language();
212 TermRef::Literal(literal.value(), None, language)
215 }
216
217 pub fn as_str(&self) -> &'a str {
219 match self {
220 TermRef::NamedNode(s) => s,
221 TermRef::BlankNode(s) => s,
222 TermRef::Literal(s, _, _) => s,
223 TermRef::Variable(s) => s,
224 }
225 }
226
227 pub fn to_owned(&self) -> Result<Term, crate::OxirsError> {
229 match self {
230 TermRef::NamedNode(iri) => NamedNode::new(*iri).map(Term::NamedNode),
231 TermRef::BlankNode(id) => BlankNode::new(*id).map(Term::BlankNode),
232 TermRef::Literal(value, datatype, language) => {
233 let literal = if let Some(lang) = language {
234 Literal::new_lang(*value, *lang)?
235 } else if let Some(dt) = datatype {
236 let dt_node = NamedNode::new(*dt)?;
237 Literal::new_typed(*value, dt_node)
238 } else {
239 Literal::new(*value)
240 };
241 Ok(Term::Literal(literal))
242 }
243 TermRef::Variable(name) => Variable::new(*name).map(Term::Variable),
244 }
245 }
246
247 pub fn is_named_node(&self) -> bool {
249 matches!(self, TermRef::NamedNode(_))
250 }
251
252 pub fn is_blank_node(&self) -> bool {
254 matches!(self, TermRef::BlankNode(_))
255 }
256
257 pub fn is_literal(&self) -> bool {
259 matches!(self, TermRef::Literal(_, _, _))
260 }
261
262 pub fn is_variable(&self) -> bool {
264 matches!(self, TermRef::Variable(_))
265 }
266}
267
268impl<'a> std::fmt::Display for TermRef<'a> {
269 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
270 match self {
271 TermRef::NamedNode(iri) => write!(f, "<{iri}>"),
272 TermRef::BlankNode(id) => write!(f, "{id}"),
273 TermRef::Literal(value, datatype, language) => {
274 write!(f, "\"{value}\"")?;
275 if let Some(lang) = language {
276 write!(f, "@{lang}")?;
277 } else if let Some(dt) = datatype {
278 write!(f, "^^<{dt}>")?;
279 }
280 Ok(())
281 }
282 TermRef::Variable(name) => write!(f, "?{name}"),
283 }
284 }
285}
286
287#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
289pub struct TripleRef<'a> {
290 pub subject: TermRef<'a>,
291 pub predicate: TermRef<'a>,
292 pub object: TermRef<'a>,
293}
294
295impl<'a> TripleRef<'a> {
296 pub fn new(subject: TermRef<'a>, predicate: TermRef<'a>, object: TermRef<'a>) -> Self {
298 TripleRef {
299 subject,
300 predicate,
301 object,
302 }
303 }
304
305 pub fn from_triple(triple: &'a Triple) -> Self {
307 TripleRef {
308 subject: match triple.subject() {
309 Subject::NamedNode(n) => TermRef::NamedNode(n.as_str()),
310 Subject::BlankNode(b) => TermRef::BlankNode(b.as_str()),
311 Subject::Variable(v) => TermRef::Variable(v.as_str()),
312 Subject::QuotedTriple(_) => TermRef::NamedNode("<<quoted-triple>>"),
313 },
314 predicate: match triple.predicate() {
315 Predicate::NamedNode(n) => TermRef::NamedNode(n.as_str()),
316 Predicate::Variable(v) => TermRef::Variable(v.as_str()),
317 },
318 object: match triple.object() {
319 Object::NamedNode(n) => TermRef::NamedNode(n.as_str()),
320 Object::BlankNode(b) => TermRef::BlankNode(b.as_str()),
321 Object::Literal(l) => TermRef::from_literal(l),
322 Object::Variable(v) => TermRef::Variable(v.as_str()),
323 Object::QuotedTriple(_) => TermRef::NamedNode("<<quoted-triple>>"),
324 },
325 }
326 }
327
328 pub fn to_owned(&self) -> Result<Triple, crate::OxirsError> {
330 let subject = match self.subject.to_owned()? {
331 Term::NamedNode(n) => Subject::NamedNode(n),
332 Term::BlankNode(b) => Subject::BlankNode(b),
333 _ => return Err(crate::OxirsError::Parse("Invalid subject term".to_string())),
334 };
335
336 let predicate = match self.predicate.to_owned()? {
337 Term::NamedNode(n) => Predicate::NamedNode(n),
338 _ => {
339 return Err(crate::OxirsError::Parse(
340 "Invalid predicate term".to_string(),
341 ))
342 }
343 };
344
345 let object = match self.object.to_owned()? {
346 Term::NamedNode(n) => Object::NamedNode(n),
347 Term::BlankNode(b) => Object::BlankNode(b),
348 Term::Literal(l) => Object::Literal(l),
349 _ => return Err(crate::OxirsError::Parse("Invalid object term".to_string())),
350 };
351
352 Ok(Triple::new(subject, predicate, object))
353 }
354}
355
356impl<'a> std::fmt::Display for TripleRef<'a> {
357 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
358 write!(f, "{} {} {} .", self.subject, self.predicate, self.object)
359 }
360}
361
362#[derive(Debug)]
364pub struct LockFreeGraph {
365 data: Atomic<GraphData>,
367 epoch: epoch::Guard,
369}
370
371#[derive(Debug)]
373struct GraphData {
374 triples: BTreeSet<Triple>,
376 version: u64,
378}
379
380impl LockFreeGraph {
381 pub fn new() -> Self {
383 let initial_data = GraphData {
384 triples: BTreeSet::new(),
385 version: 0,
386 };
387
388 LockFreeGraph {
389 data: Atomic::new(initial_data),
390 epoch: epoch::pin(),
391 }
392 }
393
394 pub fn insert(&self, triple: Triple) -> bool {
396 loop {
397 let current = self.data.load(Ordering::Acquire, &self.epoch);
398 let current_ref = unsafe { current.deref() };
399
400 if current_ref.triples.contains(&triple) {
402 return false;
403 }
404
405 let mut new_triples = current_ref.triples.clone();
407 new_triples.insert(triple.clone());
408
409 let new_data = GraphData {
410 triples: new_triples,
411 version: current_ref.version + 1,
412 };
413
414 match self.data.compare_exchange_weak(
416 current,
417 Owned::new(new_data),
418 Ordering::Release,
419 Ordering::Relaxed,
420 &self.epoch,
421 ) {
422 Ok(_) => {
423 unsafe {
425 self.epoch.defer_destroy(current);
426 }
427 return true;
428 }
429 Err(_) => {
430 continue;
432 }
433 }
434 }
435 }
436
437 pub fn len(&self) -> usize {
439 let current = self.data.load(Ordering::Acquire, &self.epoch);
440 unsafe { current.deref().triples.len() }
441 }
442
443 pub fn is_empty(&self) -> bool {
445 self.len() == 0
446 }
447
448 pub fn contains(&self, triple: &Triple) -> bool {
450 let current = self.data.load(Ordering::Acquire, &self.epoch);
451 unsafe { current.deref().triples.contains(triple) }
452 }
453}
454
455impl Default for LockFreeGraph {
456 fn default() -> Self {
457 Self::new()
458 }
459}
460
461#[derive(Debug)]
463pub struct OptimizedGraph {
464 spo: DashMap<InternedString, DashMap<InternedString, BTreeSet<InternedString>>>,
466 pos: DashMap<InternedString, DashMap<InternedString, BTreeSet<InternedString>>>,
468 osp: DashMap<InternedString, DashMap<InternedString, BTreeSet<InternedString>>>,
470 interner: Arc<StringInterner>,
472 stats: Arc<RwLock<GraphStats>>,
474}
475
476#[derive(Debug, Clone, Default)]
478pub struct GraphStats {
479 pub triple_count: usize,
480 pub unique_subjects: usize,
481 pub unique_predicates: usize,
482 pub unique_objects: usize,
483 pub index_memory_usage: usize,
484 pub intern_hit_ratio: f64,
485}
486
487impl OptimizedGraph {
488 pub fn new() -> Self {
490 OptimizedGraph {
491 spo: DashMap::new(),
492 pos: DashMap::new(),
493 osp: DashMap::new(),
494 interner: Arc::new(StringInterner::new()),
495 stats: Arc::new(RwLock::new(GraphStats::default())),
496 }
497 }
498
499 pub fn insert(&self, triple: &Triple) -> bool {
501 let subject = self.intern_subject(triple.subject());
502 let predicate = self.intern_predicate(triple.predicate());
503 let object = self.intern_object(triple.object());
504
505 let spo_entry = self.spo.entry(subject.clone()).or_default();
507 let mut po_entry = spo_entry.entry(predicate.clone()).or_default();
508 let was_new = po_entry.insert(object.clone());
509
510 if was_new {
511 let pos_entry = self.pos.entry(predicate.clone()).or_default();
513 let mut os_entry = pos_entry.entry(object.clone()).or_default();
514 os_entry.insert(subject.clone());
515
516 let osp_entry = self.osp.entry(object.clone()).or_default();
518 let mut sp_entry = osp_entry.entry(subject.clone()).or_default();
519 sp_entry.insert(predicate);
520
521 {
523 let mut stats = self.stats.write();
524 stats.triple_count += 1;
525 stats.intern_hit_ratio = self.interner.stats().hit_ratio();
526 }
527 }
528
529 was_new
530 }
531
532 pub fn query(
534 &self,
535 subject: Option<&Subject>,
536 predicate: Option<&Predicate>,
537 object: Option<&Object>,
538 ) -> Vec<Triple> {
539 let mut results = Vec::new();
540
541 match (subject.is_some(), predicate.is_some(), object.is_some()) {
543 (true, true, true) => {
544 if let (Some(s), Some(p), Some(o)) = (subject, predicate, object) {
546 let s_intern = self.intern_subject(s);
547 let p_intern = self.intern_predicate(p);
548 let o_intern = self.intern_object(o);
549
550 if let Some(po_map) = self.spo.get(&s_intern) {
551 if let Some(o_set) = po_map.get(&p_intern) {
552 if o_set.contains(&o_intern) {
553 let triple = Triple::new(s.clone(), p.clone(), o.clone());
554 results.push(triple);
555 }
556 }
557 }
558 }
559 }
560 (true, true, false) => {
561 if let (Some(s), Some(p)) = (subject, predicate) {
563 let s_intern = self.intern_subject(s);
564 let p_intern = self.intern_predicate(p);
565
566 if let Some(po_map) = self.spo.get(&s_intern) {
567 if let Some(o_set) = po_map.get(&p_intern) {
568 for o_intern in o_set.iter() {
569 if let Ok(object) = self.unintern_object(o_intern) {
570 let triple = Triple::new(s.clone(), p.clone(), object);
571 results.push(triple);
572 }
573 }
574 }
575 }
576 }
577 }
578 (false, true, true) => {
579 if let (Some(p), Some(o)) = (predicate, object) {
581 let p_intern = self.intern_predicate(p);
582 let o_intern = self.intern_object(o);
583
584 if let Some(os_map) = self.pos.get(&p_intern) {
585 if let Some(s_set) = os_map.get(&o_intern) {
586 for s_intern in s_set.iter() {
587 if let Ok(subject) = self.unintern_subject(s_intern) {
588 let triple = Triple::new(subject, p.clone(), o.clone());
589 results.push(triple);
590 }
591 }
592 }
593 }
594 }
595 }
596 _ => {
597 for s_entry in &self.spo {
599 let s_intern = s_entry.key();
600 if let Ok(s) = self.unintern_subject(s_intern) {
601 if let Some(subj) = subject {
602 if subj != &s {
603 continue;
604 }
605 }
606
607 for po_entry in s_entry.value().iter() {
608 let p_intern = po_entry.key();
609 if let Ok(p) = self.unintern_predicate(p_intern) {
610 if let Some(pred) = predicate {
611 if pred != &p {
612 continue;
613 }
614 }
615
616 for o_intern in po_entry.value().iter() {
617 if let Ok(o) = self.unintern_object(o_intern) {
618 if let Some(obj) = object {
619 if obj != &o {
620 continue;
621 }
622 }
623
624 let triple = Triple::new(s.clone(), p.clone(), o);
625 results.push(triple);
626 }
627 }
628 }
629 }
630 }
631 }
632 }
633 }
634
635 results
636 }
637
638 pub fn stats(&self) -> GraphStats {
640 self.stats.read().clone()
641 }
642
643 fn intern_subject(&self, subject: &Subject) -> InternedString {
645 match subject {
646 Subject::NamedNode(n) => InternedString::new_with_interner(n.as_str(), &self.interner),
647 Subject::BlankNode(b) => InternedString::new_with_interner(b.as_str(), &self.interner),
648 Subject::Variable(v) => InternedString::new_with_interner(v.as_str(), &self.interner),
649 Subject::QuotedTriple(_) => {
650 InternedString::new_with_interner("<<quoted-triple>>", &self.interner)
651 }
652 }
653 }
654
655 fn intern_predicate(&self, predicate: &Predicate) -> InternedString {
657 match predicate {
658 Predicate::NamedNode(n) => {
659 InternedString::new_with_interner(n.as_str(), &self.interner)
660 }
661 Predicate::Variable(v) => InternedString::new_with_interner(v.as_str(), &self.interner),
662 }
663 }
664
665 fn intern_object(&self, object: &Object) -> InternedString {
667 match object {
668 Object::NamedNode(n) => InternedString::new_with_interner(n.as_str(), &self.interner),
669 Object::BlankNode(b) => InternedString::new_with_interner(b.as_str(), &self.interner),
670 Object::Literal(l) => {
671 let serialized = format!("{l}");
673 InternedString::new_with_interner(&serialized, &self.interner)
674 }
675 Object::Variable(v) => InternedString::new_with_interner(v.as_str(), &self.interner),
676 Object::QuotedTriple(_) => {
677 InternedString::new_with_interner("<<quoted-triple>>", &self.interner)
678 }
679 }
680 }
681
682 fn unintern_subject(&self, interned: &InternedString) -> Result<Subject, crate::OxirsError> {
684 let s = interned.as_str();
685 if s.starts_with("?") || s.starts_with("$") {
686 Variable::new(&s[1..]).map(Subject::Variable)
687 } else if s.starts_with("_:") {
688 BlankNode::new(s).map(Subject::BlankNode)
689 } else {
690 NamedNode::new(s).map(Subject::NamedNode)
691 }
692 }
693
694 fn unintern_predicate(
696 &self,
697 interned: &InternedString,
698 ) -> Result<Predicate, crate::OxirsError> {
699 let s = interned.as_str();
700 if s.starts_with("?") || s.starts_with("$") {
701 Variable::new(&s[1..]).map(Predicate::Variable)
702 } else {
703 NamedNode::new(s).map(Predicate::NamedNode)
704 }
705 }
706
707 fn unintern_object(&self, interned: &InternedString) -> Result<Object, crate::OxirsError> {
709 let s = interned.as_str();
710 if s.starts_with("?") || s.starts_with("$") {
711 return Variable::new(&s[1..]).map(Object::Variable);
712 } else if let Some(stripped) = s.strip_prefix("\"") {
713 if let Some(end_quote) = stripped.find('"') {
715 let value = &stripped[..end_quote];
716 return Ok(Object::Literal(Literal::new(value)));
717 }
718 return Ok(Object::Literal(Literal::new(s)));
720 }
721
722 if s.starts_with("_:") {
723 BlankNode::new(s).map(Object::BlankNode)
724 } else {
725 NamedNode::new(s).map(Object::NamedNode)
726 }
727 }
728}
729
730impl Default for OptimizedGraph {
731 fn default() -> Self {
732 Self::new()
733 }
734}
735
736#[cfg(feature = "parallel")]
738#[derive(Debug)]
739pub struct BatchProcessor {
740 operation_queue: SegQueue<BatchOperation>,
742 processing_pool: rayon::ThreadPool,
744 stats: Arc<RwLock<BatchStats>>,
746}
747
748#[derive(Debug, Clone)]
750pub enum BatchOperation {
751 Insert(Quad),
752 Delete(Quad),
753 Update { old: Quad, new: Quad },
754 Compact,
755}
756
757#[derive(Debug, Default, Clone)]
759pub struct BatchStats {
760 pub operations_processed: usize,
761 pub batch_size: usize,
762 pub processing_time_ms: u64,
763 pub throughput_ops_per_sec: f64,
764}
765
766#[cfg(feature = "parallel")]
767impl BatchProcessor {
768 pub fn new(num_threads: usize) -> Self {
770 let pool = rayon::ThreadPoolBuilder::new()
771 .num_threads(num_threads)
772 .build()
773 .expect("thread pool builder should succeed");
774
775 BatchProcessor {
776 operation_queue: SegQueue::new(),
777 processing_pool: pool,
778 stats: Arc::new(RwLock::new(BatchStats::default())),
779 }
780 }
781
782 pub fn push(&self, operation: BatchOperation) {
784 self.operation_queue.push(operation);
785 }
786
787 pub fn process_batch(&self, batch_size: usize) -> Result<usize, crate::OxirsError> {
789 let start_time = std::time::Instant::now();
790 let mut operations = Vec::with_capacity(batch_size);
791
792 for _ in 0..batch_size {
794 if let Some(op) = self.operation_queue.pop() {
795 operations.push(op);
796 } else {
797 break;
798 }
799 }
800
801 if operations.is_empty() {
802 return Ok(0);
803 }
804
805 let operations_count = operations.len();
806
807 self.processing_pool.install(|| {
809 operations.par_iter().for_each(|operation| {
810 match operation {
811 BatchOperation::Insert(_quad) => {
812 }
814 BatchOperation::Delete(_quad) => {
815 }
817 BatchOperation::Update {
818 old: _old,
819 new: _new,
820 } => {
821 }
823 BatchOperation::Compact => {
824 }
826 }
827 });
828 });
829
830 let processing_time = start_time.elapsed();
832 {
833 let mut stats = self.stats.write();
834 stats.operations_processed += operations_count;
835 stats.batch_size = batch_size;
836 stats.processing_time_ms = processing_time.as_millis() as u64;
837 if processing_time.as_secs_f64() > 0.0 {
838 stats.throughput_ops_per_sec =
839 operations_count as f64 / processing_time.as_secs_f64();
840 }
841 }
842
843 Ok(operations_count)
844 }
845
846 pub fn stats(&self) -> BatchStats {
848 self.stats.read().clone()
849 }
850
851 pub fn pending_operations(&self) -> usize {
853 self.operation_queue.len()
854 }
855}
856
857#[cfg(feature = "parallel")]
858impl Default for BatchProcessor {
859 fn default() -> Self {
860 Self::new(num_cpus::get())
861 }
862}
863
864pub mod simd {
866 #[cfg(feature = "simd")]
867 use wide::{u8x32, CmpEq};
868
869 #[cfg(feature = "simd")]
871 pub fn validate_iri_fast(iri: &str) -> bool {
872 if iri.is_empty() {
873 return false;
874 }
875
876 let bytes = iri.as_bytes();
877 let len = bytes.len();
878
879 let chunks = len / 32;
881 let _remainder = len % 32;
882
883 for i in 0..chunks {
884 let start = i * 32;
885 let chunk = &bytes[start..start + 32];
886
887 let data = u8x32::from([
889 chunk[0], chunk[1], chunk[2], chunk[3], chunk[4], chunk[5], chunk[6], chunk[7],
890 chunk[8], chunk[9], chunk[10], chunk[11], chunk[12], chunk[13], chunk[14],
891 chunk[15], chunk[16], chunk[17], chunk[18], chunk[19], chunk[20], chunk[21],
892 chunk[22], chunk[23], chunk[24], chunk[25], chunk[26], chunk[27], chunk[28],
893 chunk[29], chunk[30], chunk[31],
894 ]);
895
896 let forbidden_chars = [b'<', b'>', b'"', b'{', b'}', b'|', b'\\', b'^', b'`', b' '];
898
899 for &forbidden in &forbidden_chars {
900 let forbidden_vec = u8x32::splat(forbidden);
901 let matches = data.simd_eq(forbidden_vec);
902 if matches.any() {
903 return false;
904 }
905 }
906
907 for &byte in chunk {
909 if matches!(byte, 0..=31 | 127..=159) {
910 return false;
911 }
912 }
913 }
914
915 for &byte in &bytes[chunks * 32..] {
917 if matches!(byte,
918 0..=31 | 127..=159 | b'<' | b'>' | b'"' | b'{' | b'}' | b'|' | b'\\' | b'^' | b'`' | b' ' ) {
921 return false;
922 }
923 }
924
925 true
926 }
927
928 #[cfg(not(feature = "simd"))]
930 pub fn validate_iri_fast(iri: &str) -> bool {
931 if iri.is_empty() {
932 return false;
933 }
934
935 for byte in iri.bytes() {
936 if matches!(
937 byte,
938 b'<' | b'>' | b'"' | b'{' | b'}' | b'|' | b'\\' | b'^' | b'`' | b' ' ) {
940 return false;
941 }
942 }
943
944 true
945 }
946
947 pub fn compare_strings_fast(a: &str, b: &str) -> std::cmp::Ordering {
949 if a.len() != b.len() {
950 return a.len().cmp(&b.len());
951 }
952
953 let a_bytes = a.as_bytes();
954 let b_bytes = b.as_bytes();
955 let len = a_bytes.len();
956
957 let chunks = len / 32;
959
960 for i in 0..chunks {
961 let start = i * 32;
962 let a_chunk = &a_bytes[start..start + 32];
963 let b_chunk = &b_bytes[start..start + 32];
964
965 for j in 0..32 {
967 match a_chunk[j].cmp(&b_chunk[j]) {
968 std::cmp::Ordering::Equal => continue,
969 other => return other,
970 }
971 }
972 }
973
974 for i in chunks * 32..len {
976 match a_bytes[i].cmp(&b_bytes[i]) {
977 std::cmp::Ordering::Equal => continue,
978 other => return other,
979 }
980 }
981
982 std::cmp::Ordering::Equal
983 }
984}
985
986#[cfg(test)]
987mod tests {
988 use super::*;
989
990 #[test]
991 fn test_rdf_arena() {
992 let arena = RdfArena::new();
993
994 let s1 = arena.alloc_str("test string 1");
995 let s2 = arena.alloc_str("test string 2");
996
997 assert_eq!(s1, "test string 1");
998 assert_eq!(s2, "test string 2");
999 assert!(arena.allocated_bytes() > 0);
1000 assert_eq!(arena.allocation_count(), 2);
1001 }
1002
1003 #[test]
1004 fn test_term_ref() {
1005 let node = NamedNode::new("http://example.org/test").unwrap();
1006 let term_ref = TermRef::from_named_node(&node);
1007
1008 assert!(term_ref.is_named_node());
1009 assert_eq!(term_ref.as_str(), "http://example.org/test");
1010
1011 let owned = term_ref.to_owned().unwrap();
1012 assert!(owned.is_named_node());
1013 }
1014
1015 #[test]
1016 fn test_triple_ref() {
1017 let subject = NamedNode::new("http://example.org/s").unwrap();
1018 let predicate = NamedNode::new("http://example.org/p").unwrap();
1019 let object = Literal::new("test object");
1020 let triple = Triple::new(subject, predicate, object);
1021
1022 let triple_ref = TripleRef::from_triple(&triple);
1023 assert!(triple_ref.subject.is_named_node());
1024 assert!(triple_ref.predicate.is_named_node());
1025 assert!(triple_ref.object.is_literal());
1026
1027 let owned = triple_ref.to_owned().unwrap();
1028 assert_eq!(owned, triple);
1029 }
1030
1031 #[test]
1032 fn test_lock_free_graph() {
1033 let graph = LockFreeGraph::new();
1034 assert!(graph.is_empty());
1035
1036 let subject = NamedNode::new("http://example.org/s").unwrap();
1037 let predicate = NamedNode::new("http://example.org/p").unwrap();
1038 let object = Literal::new("test object");
1039 let triple = Triple::new(subject, predicate, object);
1040
1041 assert!(graph.insert(triple.clone()));
1042 assert!(!graph.insert(triple.clone())); assert_eq!(graph.len(), 1);
1044 assert!(graph.contains(&triple));
1045 }
1046
1047 #[test]
1048 fn test_optimized_graph() {
1049 let graph = OptimizedGraph::new();
1050
1051 let subject = NamedNode::new("http://example.org/s").unwrap();
1052 let predicate = NamedNode::new("http://example.org/p").unwrap();
1053 let object = Literal::new("test object");
1054 let triple = Triple::new(subject.clone(), predicate.clone(), object.clone());
1055
1056 assert!(graph.insert(&triple));
1057 assert!(!graph.insert(&triple)); let results = graph.query(
1061 Some(&Subject::NamedNode(subject.clone())),
1062 Some(&Predicate::NamedNode(predicate.clone())),
1063 Some(&Object::Literal(object.clone())),
1064 );
1065 assert_eq!(results.len(), 1);
1066 assert_eq!(results[0], triple);
1067
1068 let results = graph.query(Some(&Subject::NamedNode(subject)), None, None);
1070 assert_eq!(results.len(), 1);
1071
1072 let stats = graph.stats();
1073 assert_eq!(stats.triple_count, 1);
1074 }
1075
1076 #[test]
1077 fn test_simd_iri_validation() {
1078 assert!(simd::validate_iri_fast("http://example.org/test"));
1079 assert!(!simd::validate_iri_fast("http://example.org/<invalid>"));
1080 assert!(!simd::validate_iri_fast(""));
1081 assert!(!simd::validate_iri_fast(
1082 "http://example.org/test with spaces"
1083 ));
1084 }
1085
1086 #[test]
1087 fn test_simd_string_comparison() {
1088 assert_eq!(
1089 simd::compare_strings_fast("abc", "abc"),
1090 std::cmp::Ordering::Equal
1091 );
1092 assert_eq!(
1093 simd::compare_strings_fast("abc", "def"),
1094 std::cmp::Ordering::Less
1095 );
1096 assert_eq!(
1097 simd::compare_strings_fast("def", "abc"),
1098 std::cmp::Ordering::Greater
1099 );
1100 assert_eq!(
1101 simd::compare_strings_fast("short", "longer"),
1102 std::cmp::Ordering::Less
1103 );
1104 }
1105
1106 #[test]
1107 fn test_arena_reset() {
1108 let arena = RdfArena::new();
1109
1110 arena.alloc_str("test");
1111 assert!(arena.allocated_bytes() > 0);
1112
1113 arena.reset();
1114 assert_eq!(arena.allocated_bytes(), 0);
1115 assert_eq!(arena.allocation_count(), 0);
1116 }
1117
1118 #[test]
1119 fn test_concurrent_optimized_graph() {
1120 use std::sync::Arc;
1121 use std::thread;
1122
1123 let graph = Arc::new(OptimizedGraph::new());
1124 let handles: Vec<_> = (0..10)
1125 .map(|i| {
1126 let graph = Arc::clone(&graph);
1127 thread::spawn(move || {
1128 let subject = NamedNode::new(format!("http://example.org/s{i}")).unwrap();
1129 let predicate = NamedNode::new("http://example.org/p").unwrap();
1130 let object = Literal::new(format!("object{i}"));
1131 let triple = Triple::new(subject, predicate, object);
1132
1133 graph.insert(&triple)
1134 })
1135 })
1136 .collect();
1137
1138 let results: Vec<bool> = handles.into_iter().map(|h| h.join().unwrap()).collect();
1139 assert!(results.iter().all(|&inserted| inserted));
1140
1141 let stats = graph.stats();
1142 assert_eq!(stats.triple_count, 10);
1143 }
1144}
1145
1146pub struct ZeroCopyBuffer {
1148 data: Pin<Box<[u8]>>,
1149 len: usize,
1150}
1151
1152impl ZeroCopyBuffer {
1153 pub fn new(capacity: usize) -> Self {
1155 Self::with_capacity(capacity)
1156 }
1157
1158 pub fn with_capacity(capacity: usize) -> Self {
1160 let vec = vec![0; capacity];
1161 let data = vec.into_boxed_slice();
1162
1163 ZeroCopyBuffer {
1164 data: Pin::new(data),
1165 len: 0,
1166 }
1167 }
1168
1169 pub fn as_slice(&self) -> &[u8] {
1171 &self.data[..self.len]
1172 }
1173
1174 pub fn as_mut_slice(&mut self) -> &mut [u8] {
1176 &mut self.data[..]
1177 }
1178
1179 pub fn capacity(&self) -> usize {
1181 self.data.len()
1182 }
1183
1184 pub fn len(&self) -> usize {
1186 self.len
1187 }
1188
1189 pub fn is_empty(&self) -> bool {
1191 self.len == 0
1192 }
1193
1194 pub fn clear(&mut self) {
1196 self.len = 0;
1197 }
1198
1199 pub fn reset(&mut self) {
1201 self.clear();
1202 }
1203
1204 pub fn set_len(&mut self, len: usize) {
1206 assert!(len <= self.capacity());
1207 self.len = len;
1208 }
1209
1210 pub fn write(&mut self, data: &[u8]) -> Result<usize, std::io::Error> {
1212 let available = self.capacity() - self.len;
1213 let to_write = data.len().min(available);
1214
1215 if to_write == 0 {
1216 return Err(std::io::Error::new(
1217 std::io::ErrorKind::WriteZero,
1218 "Buffer is full",
1219 ));
1220 }
1221
1222 unsafe {
1224 let dst = self.data.as_mut_ptr().add(self.len);
1225 std::ptr::copy_nonoverlapping(data.as_ptr(), dst, to_write);
1226 }
1227
1228 self.len += to_write;
1229 Ok(to_write)
1230 }
1231}
1232
1233#[derive(Clone)]
1235pub struct SimdJsonProcessor;
1236
1237impl SimdJsonProcessor {
1238 pub fn new() -> Self {
1240 SimdJsonProcessor
1241 }
1242
1243 pub fn parse<'a>(
1245 &mut self,
1246 json: &'a mut [u8],
1247 ) -> Result<simd_json::BorrowedValue<'a>, simd_json::Error> {
1248 simd_json::to_borrowed_value(json)
1249 }
1250
1251 pub fn parse_str<'a>(
1253 &mut self,
1254 json: &'a mut str,
1255 ) -> Result<simd_json::BorrowedValue<'a>, simd_json::Error> {
1256 let bytes = unsafe { json.as_bytes_mut() };
1257 simd_json::to_borrowed_value(bytes)
1258 }
1259
1260 pub fn parse_owned(
1262 &mut self,
1263 json: &mut [u8],
1264 ) -> Result<simd_json::OwnedValue, simd_json::Error> {
1265 simd_json::to_owned_value(json)
1266 }
1267
1268 pub fn parse_json(&self, json: &[u8]) -> Result<serde_json::Value, serde_json::Error> {
1270 serde_json::from_slice(json)
1271 }
1272}
1273
1274impl Default for SimdJsonProcessor {
1275 fn default() -> Self {
1276 Self::new()
1277 }
1278}
1279
1280#[derive(Clone, Debug)]
1285pub struct SimdXmlProcessor {
1286 scan_buffer: Vec<u8>,
1288}
1289
1290impl SimdXmlProcessor {
1291 pub fn new() -> Self {
1293 SimdXmlProcessor {
1294 scan_buffer: Vec::with_capacity(4096),
1295 }
1296 }
1297
1298 #[cfg(target_arch = "x86_64")]
1301 pub fn find_special_char(&self, data: &[u8]) -> Option<usize> {
1302 use std::arch::x86_64::*;
1303
1304 const CHUNK_SIZE: usize = 16;
1306 let mut offset = 0;
1307
1308 if data.len() >= CHUNK_SIZE {
1309 unsafe {
1310 let lt = _mm_set1_epi8(b'<' as i8);
1312 let gt = _mm_set1_epi8(b'>' as i8);
1313 let amp = _mm_set1_epi8(b'&' as i8);
1314 let quot = _mm_set1_epi8(b'"' as i8);
1315 let apos = _mm_set1_epi8(b'\'' as i8);
1316
1317 while offset + CHUNK_SIZE <= data.len() {
1318 let chunk = _mm_loadu_si128(data.as_ptr().add(offset) as *const __m128i);
1319
1320 let eq_lt = _mm_cmpeq_epi8(chunk, lt);
1322 let eq_gt = _mm_cmpeq_epi8(chunk, gt);
1323 let eq_amp = _mm_cmpeq_epi8(chunk, amp);
1324 let eq_quot = _mm_cmpeq_epi8(chunk, quot);
1325 let eq_apos = _mm_cmpeq_epi8(chunk, apos);
1326
1327 let any_match = _mm_or_si128(
1329 _mm_or_si128(_mm_or_si128(eq_lt, eq_gt), eq_amp),
1330 _mm_or_si128(eq_quot, eq_apos),
1331 );
1332
1333 let mask = _mm_movemask_epi8(any_match);
1334 if mask != 0 {
1335 return Some(offset + mask.trailing_zeros() as usize);
1336 }
1337
1338 offset += CHUNK_SIZE;
1339 }
1340 }
1341 }
1342
1343 data[offset..]
1345 .iter()
1346 .position(|&b| matches!(b, b'<' | b'>' | b'&' | b'"' | b'\''))
1347 .map(|i| i + offset)
1348 }
1349
1350 #[cfg(not(target_arch = "x86_64"))]
1352 pub fn find_special_char(&self, data: &[u8]) -> Option<usize> {
1353 data.iter()
1354 .position(|&b| matches!(b, b'<' | b'>' | b'&' | b'"' | b'\''))
1355 }
1356
1357 pub fn is_valid_utf8(&self, data: &[u8]) -> bool {
1360 std::str::from_utf8(data).is_ok()
1361 }
1362
1363 pub fn trim_whitespace<'a>(&self, data: &'a [u8]) -> &'a [u8] {
1366 let start = data
1367 .iter()
1368 .position(|&b| !matches!(b, b' ' | b'\t' | b'\n' | b'\r'))
1369 .unwrap_or(data.len());
1370 let end = data
1371 .iter()
1372 .rposition(|&b| !matches!(b, b' ' | b'\t' | b'\n' | b'\r'))
1373 .map(|i| i + 1)
1374 .unwrap_or(0);
1375
1376 if start >= end {
1377 &[]
1378 } else {
1379 &data[start..end]
1380 }
1381 }
1382
1383 #[cfg(target_arch = "x86_64")]
1385 pub fn find_colon(&self, data: &[u8]) -> Option<usize> {
1386 use std::arch::x86_64::*;
1387
1388 const CHUNK_SIZE: usize = 16;
1389 let mut offset = 0;
1390
1391 if data.len() >= CHUNK_SIZE {
1392 unsafe {
1393 let colon = _mm_set1_epi8(b':' as i8);
1394
1395 while offset + CHUNK_SIZE <= data.len() {
1396 let chunk = _mm_loadu_si128(data.as_ptr().add(offset) as *const __m128i);
1397 let eq = _mm_cmpeq_epi8(chunk, colon);
1398 let mask = _mm_movemask_epi8(eq);
1399
1400 if mask != 0 {
1401 return Some(offset + mask.trailing_zeros() as usize);
1402 }
1403
1404 offset += CHUNK_SIZE;
1405 }
1406 }
1407 }
1408
1409 data[offset..]
1410 .iter()
1411 .position(|&b| b == b':')
1412 .map(|i| i + offset)
1413 }
1414
1415 #[cfg(not(target_arch = "x86_64"))]
1417 pub fn find_colon(&self, data: &[u8]) -> Option<usize> {
1418 data.iter().position(|&b| b == b':')
1419 }
1420
1421 pub fn parse_qname<'a>(&self, qname: &'a [u8]) -> (&'a [u8], &'a [u8]) {
1423 match self.find_colon(qname) {
1424 Some(pos) => (&qname[..pos], &qname[pos + 1..]),
1425 None => (&[], qname),
1426 }
1427 }
1428
1429 pub fn expand_name<'a>(
1431 &self,
1432 prefix: &'a [u8],
1433 local: &'a [u8],
1434 namespaces: &HashMap<String, String>,
1435 ) -> Option<String> {
1436 let prefix_str = std::str::from_utf8(prefix).ok()?;
1437 let local_str = std::str::from_utf8(local).ok()?;
1438
1439 namespaces
1440 .get(prefix_str)
1441 .map(|ns| format!("{}{}", ns, local_str))
1442 }
1443
1444 pub fn ensure_buffer_capacity(&mut self, capacity: usize) {
1446 if self.scan_buffer.capacity() < capacity {
1447 self.scan_buffer
1448 .reserve(capacity - self.scan_buffer.capacity());
1449 }
1450 }
1451}
1452
1453impl Default for SimdXmlProcessor {
1454 fn default() -> Self {
1455 Self::new()
1456 }
1457}