1use crate::interning::{InternedString, StringInterner};
8use crate::model::*;
9use bumpalo::Bump;
10use crossbeam::epoch::{self, Atomic, Owned};
11use crossbeam::queue::SegQueue;
12use dashmap::DashMap;
13use parking_lot::RwLock;
14#[cfg(feature = "parallel")]
15use rayon::iter::IntoParallelRefIterator;
16#[cfg(feature = "parallel")]
17use rayon::iter::ParallelIterator;
18use simd_json;
19use std::collections::BTreeSet;
20use std::pin::Pin;
21use std::sync::atomic::Ordering;
22use std::sync::Arc;
23
24pub type TermInterner = StringInterner;
26
27pub trait TermInternerExt {
29 fn intern_named_node(&self, iri: &str) -> Result<NamedNode, crate::OxirsError>;
31
32 fn intern_blank_node(&self) -> BlankNode;
34
35 fn intern_literal(&self, value: &str) -> Result<Literal, crate::OxirsError>;
37
38 fn intern_literal_with_datatype(
40 &self,
41 value: &str,
42 datatype_iri: &str,
43 ) -> Result<Literal, crate::OxirsError>;
44
45 fn intern_literal_with_language(
47 &self,
48 value: &str,
49 language: &str,
50 ) -> Result<Literal, crate::OxirsError>;
51}
52
53impl TermInternerExt for TermInterner {
54 fn intern_named_node(&self, iri: &str) -> Result<NamedNode, crate::OxirsError> {
55 let interned = self.intern(iri);
57 NamedNode::new(interned.as_ref())
59 }
60
61 fn intern_blank_node(&self) -> BlankNode {
62 BlankNode::new_unique()
64 }
65
66 fn intern_literal(&self, value: &str) -> Result<Literal, crate::OxirsError> {
67 let interned = self.intern(value);
69 Ok(Literal::new_simple_literal(interned.as_ref()))
71 }
72
73 fn intern_literal_with_datatype(
74 &self,
75 value: &str,
76 datatype_iri: &str,
77 ) -> Result<Literal, crate::OxirsError> {
78 let value_interned = self.intern(value);
80 let datatype_interned = self.intern(datatype_iri);
81 let datatype_node = NamedNode::new(datatype_interned.as_ref())?;
83 Ok(Literal::new_typed_literal(
84 value_interned.as_ref(),
85 datatype_node,
86 ))
87 }
88
89 fn intern_literal_with_language(
90 &self,
91 value: &str,
92 language: &str,
93 ) -> Result<Literal, crate::OxirsError> {
94 let value_interned = self.intern(value);
96 let language_interned = self.intern(language);
97 let literal = Literal::new_language_tagged_literal(
99 value_interned.as_ref(),
100 language_interned.as_ref(),
101 )?;
102 Ok(literal)
103 }
104}
105
106#[derive(Debug)]
110pub struct RdfArena {
111 arena: std::sync::Mutex<Bump>,
113 interner: StringInterner,
115 allocated_bytes: std::sync::atomic::AtomicUsize,
117 allocation_count: std::sync::atomic::AtomicUsize,
118}
119
120impl RdfArena {
121 pub fn new() -> Self {
123 RdfArena {
124 arena: std::sync::Mutex::new(Bump::new()),
125 interner: StringInterner::new(),
126 allocated_bytes: std::sync::atomic::AtomicUsize::new(0),
127 allocation_count: std::sync::atomic::AtomicUsize::new(0),
128 }
129 }
130
131 pub fn with_capacity(capacity: usize) -> Self {
133 RdfArena {
134 arena: std::sync::Mutex::new(Bump::with_capacity(capacity)),
135 interner: StringInterner::new(),
136 allocated_bytes: std::sync::atomic::AtomicUsize::new(0),
137 allocation_count: std::sync::atomic::AtomicUsize::new(0),
138 }
139 }
140
141 pub fn alloc_str(&self, s: &str) -> String {
143 self.allocated_bytes
146 .fetch_add(s.len(), std::sync::atomic::Ordering::Relaxed);
147 self.allocation_count
148 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
149 s.to_string()
150 }
151
152 pub fn intern_str(&self, s: &str) -> InternedString {
154 InternedString::new_with_interner(s, &self.interner)
155 }
156
157 pub fn reset(&self) {
159 if let Ok(mut arena) = self.arena.lock() {
160 arena.reset();
161 self.allocated_bytes
162 .store(0, std::sync::atomic::Ordering::Relaxed);
163 self.allocation_count
164 .store(0, std::sync::atomic::Ordering::Relaxed);
165 }
166 }
167
168 pub fn allocated_bytes(&self) -> usize {
170 self.allocated_bytes
171 .load(std::sync::atomic::Ordering::Relaxed)
172 }
173
174 pub fn allocation_count(&self) -> usize {
176 self.allocation_count
177 .load(std::sync::atomic::Ordering::Relaxed)
178 }
179}
180
181impl Default for RdfArena {
182 fn default() -> Self {
183 Self::new()
184 }
185}
186
187#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
191pub enum TermRef<'a> {
192 NamedNode(&'a str),
193 BlankNode(&'a str),
194 Literal(&'a str, Option<&'a str>, Option<&'a str>), Variable(&'a str),
196}
197
198impl<'a> TermRef<'a> {
199 pub fn from_named_node(node: &'a NamedNode) -> Self {
201 TermRef::NamedNode(node.as_str())
202 }
203
204 pub fn from_blank_node(node: &'a BlankNode) -> Self {
206 TermRef::BlankNode(node.as_str())
207 }
208
209 pub fn from_literal(literal: &'a Literal) -> Self {
211 let language = literal.language();
212 TermRef::Literal(literal.value(), None, language)
215 }
216
217 pub fn as_str(&self) -> &'a str {
219 match self {
220 TermRef::NamedNode(s) => s,
221 TermRef::BlankNode(s) => s,
222 TermRef::Literal(s, _, _) => s,
223 TermRef::Variable(s) => s,
224 }
225 }
226
227 pub fn to_owned(&self) -> Result<Term, crate::OxirsError> {
229 match self {
230 TermRef::NamedNode(iri) => NamedNode::new(*iri).map(Term::NamedNode),
231 TermRef::BlankNode(id) => BlankNode::new(*id).map(Term::BlankNode),
232 TermRef::Literal(value, datatype, language) => {
233 let literal = if let Some(lang) = language {
234 Literal::new_lang(*value, *lang)?
235 } else if let Some(dt) = datatype {
236 let dt_node = NamedNode::new(*dt)?;
237 Literal::new_typed(*value, dt_node)
238 } else {
239 Literal::new(*value)
240 };
241 Ok(Term::Literal(literal))
242 }
243 TermRef::Variable(name) => Variable::new(*name).map(Term::Variable),
244 }
245 }
246
247 pub fn is_named_node(&self) -> bool {
249 matches!(self, TermRef::NamedNode(_))
250 }
251
252 pub fn is_blank_node(&self) -> bool {
254 matches!(self, TermRef::BlankNode(_))
255 }
256
257 pub fn is_literal(&self) -> bool {
259 matches!(self, TermRef::Literal(_, _, _))
260 }
261
262 pub fn is_variable(&self) -> bool {
264 matches!(self, TermRef::Variable(_))
265 }
266}
267
268impl<'a> std::fmt::Display for TermRef<'a> {
269 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
270 match self {
271 TermRef::NamedNode(iri) => write!(f, "<{iri}>"),
272 TermRef::BlankNode(id) => write!(f, "{id}"),
273 TermRef::Literal(value, datatype, language) => {
274 write!(f, "\"{value}\"")?;
275 if let Some(lang) = language {
276 write!(f, "@{lang}")?;
277 } else if let Some(dt) = datatype {
278 write!(f, "^^<{dt}>")?;
279 }
280 Ok(())
281 }
282 TermRef::Variable(name) => write!(f, "?{name}"),
283 }
284 }
285}
286
287#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
289pub struct TripleRef<'a> {
290 pub subject: TermRef<'a>,
291 pub predicate: TermRef<'a>,
292 pub object: TermRef<'a>,
293}
294
295impl<'a> TripleRef<'a> {
296 pub fn new(subject: TermRef<'a>, predicate: TermRef<'a>, object: TermRef<'a>) -> Self {
298 TripleRef {
299 subject,
300 predicate,
301 object,
302 }
303 }
304
305 pub fn from_triple(triple: &'a Triple) -> Self {
307 TripleRef {
308 subject: match triple.subject() {
309 Subject::NamedNode(n) => TermRef::NamedNode(n.as_str()),
310 Subject::BlankNode(b) => TermRef::BlankNode(b.as_str()),
311 Subject::Variable(v) => TermRef::Variable(v.as_str()),
312 Subject::QuotedTriple(_) => TermRef::NamedNode("<<quoted-triple>>"),
313 },
314 predicate: match triple.predicate() {
315 Predicate::NamedNode(n) => TermRef::NamedNode(n.as_str()),
316 Predicate::Variable(v) => TermRef::Variable(v.as_str()),
317 },
318 object: match triple.object() {
319 Object::NamedNode(n) => TermRef::NamedNode(n.as_str()),
320 Object::BlankNode(b) => TermRef::BlankNode(b.as_str()),
321 Object::Literal(l) => TermRef::from_literal(l),
322 Object::Variable(v) => TermRef::Variable(v.as_str()),
323 Object::QuotedTriple(_) => TermRef::NamedNode("<<quoted-triple>>"),
324 },
325 }
326 }
327
328 pub fn to_owned(&self) -> Result<Triple, crate::OxirsError> {
330 let subject = match self.subject.to_owned()? {
331 Term::NamedNode(n) => Subject::NamedNode(n),
332 Term::BlankNode(b) => Subject::BlankNode(b),
333 _ => return Err(crate::OxirsError::Parse("Invalid subject term".to_string())),
334 };
335
336 let predicate = match self.predicate.to_owned()? {
337 Term::NamedNode(n) => Predicate::NamedNode(n),
338 _ => {
339 return Err(crate::OxirsError::Parse(
340 "Invalid predicate term".to_string(),
341 ))
342 }
343 };
344
345 let object = match self.object.to_owned()? {
346 Term::NamedNode(n) => Object::NamedNode(n),
347 Term::BlankNode(b) => Object::BlankNode(b),
348 Term::Literal(l) => Object::Literal(l),
349 _ => return Err(crate::OxirsError::Parse("Invalid object term".to_string())),
350 };
351
352 Ok(Triple::new(subject, predicate, object))
353 }
354}
355
356impl<'a> std::fmt::Display for TripleRef<'a> {
357 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
358 write!(f, "{} {} {} .", self.subject, self.predicate, self.object)
359 }
360}
361
362#[derive(Debug)]
364pub struct LockFreeGraph {
365 data: Atomic<GraphData>,
367 epoch: epoch::Guard,
369}
370
371#[derive(Debug)]
373struct GraphData {
374 triples: BTreeSet<Triple>,
376 version: u64,
378}
379
380impl LockFreeGraph {
381 pub fn new() -> Self {
383 let initial_data = GraphData {
384 triples: BTreeSet::new(),
385 version: 0,
386 };
387
388 LockFreeGraph {
389 data: Atomic::new(initial_data),
390 epoch: epoch::pin(),
391 }
392 }
393
394 pub fn insert(&self, triple: Triple) -> bool {
396 loop {
397 let current = self.data.load(Ordering::Acquire, &self.epoch);
398 let current_ref = unsafe { current.deref() };
399
400 if current_ref.triples.contains(&triple) {
402 return false;
403 }
404
405 let mut new_triples = current_ref.triples.clone();
407 new_triples.insert(triple.clone());
408
409 let new_data = GraphData {
410 triples: new_triples,
411 version: current_ref.version + 1,
412 };
413
414 match self.data.compare_exchange_weak(
416 current,
417 Owned::new(new_data),
418 Ordering::Release,
419 Ordering::Relaxed,
420 &self.epoch,
421 ) {
422 Ok(_) => {
423 unsafe {
425 self.epoch.defer_destroy(current);
426 }
427 return true;
428 }
429 Err(_) => {
430 continue;
432 }
433 }
434 }
435 }
436
437 pub fn len(&self) -> usize {
439 let current = self.data.load(Ordering::Acquire, &self.epoch);
440 unsafe { current.deref().triples.len() }
441 }
442
443 pub fn is_empty(&self) -> bool {
445 self.len() == 0
446 }
447
448 pub fn contains(&self, triple: &Triple) -> bool {
450 let current = self.data.load(Ordering::Acquire, &self.epoch);
451 unsafe { current.deref().triples.contains(triple) }
452 }
453}
454
455impl Default for LockFreeGraph {
456 fn default() -> Self {
457 Self::new()
458 }
459}
460
461#[derive(Debug)]
463pub struct OptimizedGraph {
464 spo: DashMap<InternedString, DashMap<InternedString, BTreeSet<InternedString>>>,
466 pos: DashMap<InternedString, DashMap<InternedString, BTreeSet<InternedString>>>,
468 osp: DashMap<InternedString, DashMap<InternedString, BTreeSet<InternedString>>>,
470 interner: Arc<StringInterner>,
472 stats: Arc<RwLock<GraphStats>>,
474}
475
476#[derive(Debug, Clone, Default)]
478pub struct GraphStats {
479 pub triple_count: usize,
480 pub unique_subjects: usize,
481 pub unique_predicates: usize,
482 pub unique_objects: usize,
483 pub index_memory_usage: usize,
484 pub intern_hit_ratio: f64,
485}
486
487impl OptimizedGraph {
488 pub fn new() -> Self {
490 OptimizedGraph {
491 spo: DashMap::new(),
492 pos: DashMap::new(),
493 osp: DashMap::new(),
494 interner: Arc::new(StringInterner::new()),
495 stats: Arc::new(RwLock::new(GraphStats::default())),
496 }
497 }
498
499 pub fn insert(&self, triple: &Triple) -> bool {
501 let subject = self.intern_subject(triple.subject());
502 let predicate = self.intern_predicate(triple.predicate());
503 let object = self.intern_object(triple.object());
504
505 let spo_entry = self.spo.entry(subject.clone()).or_default();
507 let mut po_entry = spo_entry.entry(predicate.clone()).or_default();
508 let was_new = po_entry.insert(object.clone());
509
510 if was_new {
511 let pos_entry = self.pos.entry(predicate.clone()).or_default();
513 let mut os_entry = pos_entry.entry(object.clone()).or_default();
514 os_entry.insert(subject.clone());
515
516 let osp_entry = self.osp.entry(object.clone()).or_default();
518 let mut sp_entry = osp_entry.entry(subject.clone()).or_default();
519 sp_entry.insert(predicate);
520
521 {
523 let mut stats = self.stats.write();
524 stats.triple_count += 1;
525 stats.intern_hit_ratio = self.interner.stats().hit_ratio();
526 }
527 }
528
529 was_new
530 }
531
532 pub fn query(
534 &self,
535 subject: Option<&Subject>,
536 predicate: Option<&Predicate>,
537 object: Option<&Object>,
538 ) -> Vec<Triple> {
539 let mut results = Vec::new();
540
541 match (subject.is_some(), predicate.is_some(), object.is_some()) {
543 (true, true, true) => {
544 if let (Some(s), Some(p), Some(o)) = (subject, predicate, object) {
546 let s_intern = self.intern_subject(s);
547 let p_intern = self.intern_predicate(p);
548 let o_intern = self.intern_object(o);
549
550 if let Some(po_map) = self.spo.get(&s_intern) {
551 if let Some(o_set) = po_map.get(&p_intern) {
552 if o_set.contains(&o_intern) {
553 let triple = Triple::new(s.clone(), p.clone(), o.clone());
554 results.push(triple);
555 }
556 }
557 }
558 }
559 }
560 (true, true, false) => {
561 if let (Some(s), Some(p)) = (subject, predicate) {
563 let s_intern = self.intern_subject(s);
564 let p_intern = self.intern_predicate(p);
565
566 if let Some(po_map) = self.spo.get(&s_intern) {
567 if let Some(o_set) = po_map.get(&p_intern) {
568 for o_intern in o_set.iter() {
569 if let Ok(object) = self.unintern_object(o_intern) {
570 let triple = Triple::new(s.clone(), p.clone(), object);
571 results.push(triple);
572 }
573 }
574 }
575 }
576 }
577 }
578 (false, true, true) => {
579 if let (Some(p), Some(o)) = (predicate, object) {
581 let p_intern = self.intern_predicate(p);
582 let o_intern = self.intern_object(o);
583
584 if let Some(os_map) = self.pos.get(&p_intern) {
585 if let Some(s_set) = os_map.get(&o_intern) {
586 for s_intern in s_set.iter() {
587 if let Ok(subject) = self.unintern_subject(s_intern) {
588 let triple = Triple::new(subject, p.clone(), o.clone());
589 results.push(triple);
590 }
591 }
592 }
593 }
594 }
595 }
596 _ => {
597 for s_entry in &self.spo {
599 let s_intern = s_entry.key();
600 if let Ok(s) = self.unintern_subject(s_intern) {
601 if subject.is_some() && subject.unwrap() != &s {
602 continue;
603 }
604
605 for po_entry in s_entry.value().iter() {
606 let p_intern = po_entry.key();
607 if let Ok(p) = self.unintern_predicate(p_intern) {
608 if predicate.is_some() && predicate.unwrap() != &p {
609 continue;
610 }
611
612 for o_intern in po_entry.value().iter() {
613 if let Ok(o) = self.unintern_object(o_intern) {
614 if object.is_some() && object.unwrap() != &o {
615 continue;
616 }
617
618 let triple = Triple::new(s.clone(), p.clone(), o);
619 results.push(triple);
620 }
621 }
622 }
623 }
624 }
625 }
626 }
627 }
628
629 results
630 }
631
632 pub fn stats(&self) -> GraphStats {
634 self.stats.read().clone()
635 }
636
637 fn intern_subject(&self, subject: &Subject) -> InternedString {
639 match subject {
640 Subject::NamedNode(n) => InternedString::new_with_interner(n.as_str(), &self.interner),
641 Subject::BlankNode(b) => InternedString::new_with_interner(b.as_str(), &self.interner),
642 Subject::Variable(v) => InternedString::new_with_interner(v.as_str(), &self.interner),
643 Subject::QuotedTriple(_) => {
644 InternedString::new_with_interner("<<quoted-triple>>", &self.interner)
645 }
646 }
647 }
648
649 fn intern_predicate(&self, predicate: &Predicate) -> InternedString {
651 match predicate {
652 Predicate::NamedNode(n) => {
653 InternedString::new_with_interner(n.as_str(), &self.interner)
654 }
655 Predicate::Variable(v) => InternedString::new_with_interner(v.as_str(), &self.interner),
656 }
657 }
658
659 fn intern_object(&self, object: &Object) -> InternedString {
661 match object {
662 Object::NamedNode(n) => InternedString::new_with_interner(n.as_str(), &self.interner),
663 Object::BlankNode(b) => InternedString::new_with_interner(b.as_str(), &self.interner),
664 Object::Literal(l) => {
665 let serialized = format!("{l}");
667 InternedString::new_with_interner(&serialized, &self.interner)
668 }
669 Object::Variable(v) => InternedString::new_with_interner(v.as_str(), &self.interner),
670 Object::QuotedTriple(_) => {
671 InternedString::new_with_interner("<<quoted-triple>>", &self.interner)
672 }
673 }
674 }
675
676 fn unintern_subject(&self, interned: &InternedString) -> Result<Subject, crate::OxirsError> {
678 let s = interned.as_str();
679 if s.starts_with("?") || s.starts_with("$") {
680 Variable::new(&s[1..]).map(Subject::Variable)
681 } else if s.starts_with("_:") {
682 BlankNode::new(s).map(Subject::BlankNode)
683 } else {
684 NamedNode::new(s).map(Subject::NamedNode)
685 }
686 }
687
688 fn unintern_predicate(
690 &self,
691 interned: &InternedString,
692 ) -> Result<Predicate, crate::OxirsError> {
693 let s = interned.as_str();
694 if s.starts_with("?") || s.starts_with("$") {
695 Variable::new(&s[1..]).map(Predicate::Variable)
696 } else {
697 NamedNode::new(s).map(Predicate::NamedNode)
698 }
699 }
700
701 fn unintern_object(&self, interned: &InternedString) -> Result<Object, crate::OxirsError> {
703 let s = interned.as_str();
704 if s.starts_with("?") || s.starts_with("$") {
705 return Variable::new(&s[1..]).map(Object::Variable);
706 } else if let Some(stripped) = s.strip_prefix("\"") {
707 if let Some(end_quote) = stripped.find('"') {
709 let value = &stripped[..end_quote];
710 return Ok(Object::Literal(Literal::new(value)));
711 }
712 return Ok(Object::Literal(Literal::new(s)));
714 }
715
716 if s.starts_with("_:") {
717 BlankNode::new(s).map(Object::BlankNode)
718 } else {
719 NamedNode::new(s).map(Object::NamedNode)
720 }
721 }
722}
723
724impl Default for OptimizedGraph {
725 fn default() -> Self {
726 Self::new()
727 }
728}
729
730#[cfg(feature = "parallel")]
732#[derive(Debug)]
733pub struct BatchProcessor {
734 operation_queue: SegQueue<BatchOperation>,
736 processing_pool: rayon::ThreadPool,
738 stats: Arc<RwLock<BatchStats>>,
740}
741
742#[derive(Debug, Clone)]
744pub enum BatchOperation {
745 Insert(Quad),
746 Delete(Quad),
747 Update { old: Quad, new: Quad },
748 Compact,
749}
750
751#[derive(Debug, Default, Clone)]
753pub struct BatchStats {
754 pub operations_processed: usize,
755 pub batch_size: usize,
756 pub processing_time_ms: u64,
757 pub throughput_ops_per_sec: f64,
758}
759
760#[cfg(feature = "parallel")]
761impl BatchProcessor {
762 pub fn new(num_threads: usize) -> Self {
764 let pool = rayon::ThreadPoolBuilder::new()
765 .num_threads(num_threads)
766 .build()
767 .unwrap();
768
769 BatchProcessor {
770 operation_queue: SegQueue::new(),
771 processing_pool: pool,
772 stats: Arc::new(RwLock::new(BatchStats::default())),
773 }
774 }
775
776 pub fn push(&self, operation: BatchOperation) {
778 self.operation_queue.push(operation);
779 }
780
781 pub fn process_batch(&self, batch_size: usize) -> Result<usize, crate::OxirsError> {
783 let start_time = std::time::Instant::now();
784 let mut operations = Vec::with_capacity(batch_size);
785
786 for _ in 0..batch_size {
788 if let Some(op) = self.operation_queue.pop() {
789 operations.push(op);
790 } else {
791 break;
792 }
793 }
794
795 if operations.is_empty() {
796 return Ok(0);
797 }
798
799 let operations_count = operations.len();
800
801 self.processing_pool.install(|| {
803 operations.par_iter().for_each(|operation| {
804 match operation {
805 BatchOperation::Insert(_quad) => {
806 }
808 BatchOperation::Delete(_quad) => {
809 }
811 BatchOperation::Update {
812 old: _old,
813 new: _new,
814 } => {
815 }
817 BatchOperation::Compact => {
818 }
820 }
821 });
822 });
823
824 let processing_time = start_time.elapsed();
826 {
827 let mut stats = self.stats.write();
828 stats.operations_processed += operations_count;
829 stats.batch_size = batch_size;
830 stats.processing_time_ms = processing_time.as_millis() as u64;
831 if processing_time.as_secs_f64() > 0.0 {
832 stats.throughput_ops_per_sec =
833 operations_count as f64 / processing_time.as_secs_f64();
834 }
835 }
836
837 Ok(operations_count)
838 }
839
840 pub fn stats(&self) -> BatchStats {
842 self.stats.read().clone()
843 }
844
845 pub fn pending_operations(&self) -> usize {
847 self.operation_queue.len()
848 }
849}
850
851#[cfg(feature = "parallel")]
852impl Default for BatchProcessor {
853 fn default() -> Self {
854 Self::new(num_cpus::get())
855 }
856}
857
858pub mod simd {
860 #[cfg(feature = "simd")]
861 use wide::{u8x32, CmpEq};
862
863 #[cfg(feature = "simd")]
865 pub fn validate_iri_fast(iri: &str) -> bool {
866 if iri.is_empty() {
867 return false;
868 }
869
870 let bytes = iri.as_bytes();
871 let len = bytes.len();
872
873 let chunks = len / 32;
875 let _remainder = len % 32;
876
877 for i in 0..chunks {
878 let start = i * 32;
879 let chunk = &bytes[start..start + 32];
880
881 let data = u8x32::from([
883 chunk[0], chunk[1], chunk[2], chunk[3], chunk[4], chunk[5], chunk[6], chunk[7],
884 chunk[8], chunk[9], chunk[10], chunk[11], chunk[12], chunk[13], chunk[14],
885 chunk[15], chunk[16], chunk[17], chunk[18], chunk[19], chunk[20], chunk[21],
886 chunk[22], chunk[23], chunk[24], chunk[25], chunk[26], chunk[27], chunk[28],
887 chunk[29], chunk[30], chunk[31],
888 ]);
889
890 let forbidden_chars = [b'<', b'>', b'"', b'{', b'}', b'|', b'\\', b'^', b'`', b' '];
892
893 for &forbidden in &forbidden_chars {
894 let forbidden_vec = u8x32::splat(forbidden);
895 let matches = data.cmp_eq(forbidden_vec);
896 if matches.any() {
897 return false;
898 }
899 }
900
901 for &byte in chunk {
903 if matches!(byte, 0..=31 | 127..=159) {
904 return false;
905 }
906 }
907 }
908
909 for &byte in &bytes[chunks * 32..] {
911 if matches!(byte,
912 0..=31 | 127..=159 | b'<' | b'>' | b'"' | b'{' | b'}' | b'|' | b'\\' | b'^' | b'`' | b' ' ) {
915 return false;
916 }
917 }
918
919 true
920 }
921
922 #[cfg(not(feature = "simd"))]
924 pub fn validate_iri_fast(iri: &str) -> bool {
925 if iri.is_empty() {
926 return false;
927 }
928
929 for byte in iri.bytes() {
930 if matches!(
931 byte,
932 b'<' | b'>' | b'"' | b'{' | b'}' | b'|' | b'\\' | b'^' | b'`' | b' ' ) {
934 return false;
935 }
936 }
937
938 true
939 }
940
941 pub fn compare_strings_fast(a: &str, b: &str) -> std::cmp::Ordering {
943 if a.len() != b.len() {
944 return a.len().cmp(&b.len());
945 }
946
947 let a_bytes = a.as_bytes();
948 let b_bytes = b.as_bytes();
949 let len = a_bytes.len();
950
951 let chunks = len / 32;
953
954 for i in 0..chunks {
955 let start = i * 32;
956 let a_chunk = &a_bytes[start..start + 32];
957 let b_chunk = &b_bytes[start..start + 32];
958
959 for j in 0..32 {
961 match a_chunk[j].cmp(&b_chunk[j]) {
962 std::cmp::Ordering::Equal => continue,
963 other => return other,
964 }
965 }
966 }
967
968 for i in chunks * 32..len {
970 match a_bytes[i].cmp(&b_bytes[i]) {
971 std::cmp::Ordering::Equal => continue,
972 other => return other,
973 }
974 }
975
976 std::cmp::Ordering::Equal
977 }
978}
979
980#[cfg(test)]
981mod tests {
982 use super::*;
983
984 #[test]
985 fn test_rdf_arena() {
986 let arena = RdfArena::new();
987
988 let s1 = arena.alloc_str("test string 1");
989 let s2 = arena.alloc_str("test string 2");
990
991 assert_eq!(s1, "test string 1");
992 assert_eq!(s2, "test string 2");
993 assert!(arena.allocated_bytes() > 0);
994 assert_eq!(arena.allocation_count(), 2);
995 }
996
997 #[test]
998 fn test_term_ref() {
999 let node = NamedNode::new("http://example.org/test").unwrap();
1000 let term_ref = TermRef::from_named_node(&node);
1001
1002 assert!(term_ref.is_named_node());
1003 assert_eq!(term_ref.as_str(), "http://example.org/test");
1004
1005 let owned = term_ref.to_owned().unwrap();
1006 assert!(owned.is_named_node());
1007 }
1008
1009 #[test]
1010 fn test_triple_ref() {
1011 let subject = NamedNode::new("http://example.org/s").unwrap();
1012 let predicate = NamedNode::new("http://example.org/p").unwrap();
1013 let object = Literal::new("test object");
1014 let triple = Triple::new(subject, predicate, object);
1015
1016 let triple_ref = TripleRef::from_triple(&triple);
1017 assert!(triple_ref.subject.is_named_node());
1018 assert!(triple_ref.predicate.is_named_node());
1019 assert!(triple_ref.object.is_literal());
1020
1021 let owned = triple_ref.to_owned().unwrap();
1022 assert_eq!(owned, triple);
1023 }
1024
1025 #[test]
1026 fn test_lock_free_graph() {
1027 let graph = LockFreeGraph::new();
1028 assert!(graph.is_empty());
1029
1030 let subject = NamedNode::new("http://example.org/s").unwrap();
1031 let predicate = NamedNode::new("http://example.org/p").unwrap();
1032 let object = Literal::new("test object");
1033 let triple = Triple::new(subject, predicate, object);
1034
1035 assert!(graph.insert(triple.clone()));
1036 assert!(!graph.insert(triple.clone())); assert_eq!(graph.len(), 1);
1038 assert!(graph.contains(&triple));
1039 }
1040
1041 #[test]
1042 fn test_optimized_graph() {
1043 let graph = OptimizedGraph::new();
1044
1045 let subject = NamedNode::new("http://example.org/s").unwrap();
1046 let predicate = NamedNode::new("http://example.org/p").unwrap();
1047 let object = Literal::new("test object");
1048 let triple = Triple::new(subject.clone(), predicate.clone(), object.clone());
1049
1050 assert!(graph.insert(&triple));
1051 assert!(!graph.insert(&triple)); let results = graph.query(
1055 Some(&Subject::NamedNode(subject.clone())),
1056 Some(&Predicate::NamedNode(predicate.clone())),
1057 Some(&Object::Literal(object.clone())),
1058 );
1059 assert_eq!(results.len(), 1);
1060 assert_eq!(results[0], triple);
1061
1062 let results = graph.query(Some(&Subject::NamedNode(subject)), None, None);
1064 assert_eq!(results.len(), 1);
1065
1066 let stats = graph.stats();
1067 assert_eq!(stats.triple_count, 1);
1068 }
1069
1070 #[test]
1071 fn test_simd_iri_validation() {
1072 assert!(simd::validate_iri_fast("http://example.org/test"));
1073 assert!(!simd::validate_iri_fast("http://example.org/<invalid>"));
1074 assert!(!simd::validate_iri_fast(""));
1075 assert!(!simd::validate_iri_fast(
1076 "http://example.org/test with spaces"
1077 ));
1078 }
1079
1080 #[test]
1081 fn test_simd_string_comparison() {
1082 assert_eq!(
1083 simd::compare_strings_fast("abc", "abc"),
1084 std::cmp::Ordering::Equal
1085 );
1086 assert_eq!(
1087 simd::compare_strings_fast("abc", "def"),
1088 std::cmp::Ordering::Less
1089 );
1090 assert_eq!(
1091 simd::compare_strings_fast("def", "abc"),
1092 std::cmp::Ordering::Greater
1093 );
1094 assert_eq!(
1095 simd::compare_strings_fast("short", "longer"),
1096 std::cmp::Ordering::Less
1097 );
1098 }
1099
1100 #[test]
1101 fn test_arena_reset() {
1102 let arena = RdfArena::new();
1103
1104 arena.alloc_str("test");
1105 assert!(arena.allocated_bytes() > 0);
1106
1107 arena.reset();
1108 assert_eq!(arena.allocated_bytes(), 0);
1109 assert_eq!(arena.allocation_count(), 0);
1110 }
1111
1112 #[test]
1113 fn test_concurrent_optimized_graph() {
1114 use std::sync::Arc;
1115 use std::thread;
1116
1117 let graph = Arc::new(OptimizedGraph::new());
1118 let handles: Vec<_> = (0..10)
1119 .map(|i| {
1120 let graph = Arc::clone(&graph);
1121 thread::spawn(move || {
1122 let subject = NamedNode::new(format!("http://example.org/s{i}")).unwrap();
1123 let predicate = NamedNode::new("http://example.org/p").unwrap();
1124 let object = Literal::new(format!("object{i}"));
1125 let triple = Triple::new(subject, predicate, object);
1126
1127 graph.insert(&triple)
1128 })
1129 })
1130 .collect();
1131
1132 let results: Vec<bool> = handles.into_iter().map(|h| h.join().unwrap()).collect();
1133 assert!(results.iter().all(|&inserted| inserted));
1134
1135 let stats = graph.stats();
1136 assert_eq!(stats.triple_count, 10);
1137 }
1138}
1139
1140pub struct ZeroCopyBuffer {
1142 data: Pin<Box<[u8]>>,
1143 len: usize,
1144}
1145
1146impl ZeroCopyBuffer {
1147 pub fn new(capacity: usize) -> Self {
1149 Self::with_capacity(capacity)
1150 }
1151
1152 pub fn with_capacity(capacity: usize) -> Self {
1154 let vec = vec![0; capacity];
1155 let data = vec.into_boxed_slice();
1156
1157 ZeroCopyBuffer {
1158 data: Pin::new(data),
1159 len: 0,
1160 }
1161 }
1162
1163 pub fn as_slice(&self) -> &[u8] {
1165 &self.data[..self.len]
1166 }
1167
1168 pub fn as_mut_slice(&mut self) -> &mut [u8] {
1170 &mut self.data[..]
1171 }
1172
1173 pub fn capacity(&self) -> usize {
1175 self.data.len()
1176 }
1177
1178 pub fn len(&self) -> usize {
1180 self.len
1181 }
1182
1183 pub fn is_empty(&self) -> bool {
1185 self.len == 0
1186 }
1187
1188 pub fn clear(&mut self) {
1190 self.len = 0;
1191 }
1192
1193 pub fn reset(&mut self) {
1195 self.clear();
1196 }
1197
1198 pub fn set_len(&mut self, len: usize) {
1200 assert!(len <= self.capacity());
1201 self.len = len;
1202 }
1203
1204 pub fn write(&mut self, data: &[u8]) -> Result<usize, std::io::Error> {
1206 let available = self.capacity() - self.len;
1207 let to_write = data.len().min(available);
1208
1209 if to_write == 0 {
1210 return Err(std::io::Error::new(
1211 std::io::ErrorKind::WriteZero,
1212 "Buffer is full",
1213 ));
1214 }
1215
1216 unsafe {
1218 let dst = self.data.as_mut_ptr().add(self.len);
1219 std::ptr::copy_nonoverlapping(data.as_ptr(), dst, to_write);
1220 }
1221
1222 self.len += to_write;
1223 Ok(to_write)
1224 }
1225}
1226
1227#[derive(Clone)]
1229pub struct SimdJsonProcessor;
1230
1231impl SimdJsonProcessor {
1232 pub fn new() -> Self {
1234 SimdJsonProcessor
1235 }
1236
1237 pub fn parse<'a>(
1239 &mut self,
1240 json: &'a mut [u8],
1241 ) -> Result<simd_json::BorrowedValue<'a>, simd_json::Error> {
1242 simd_json::to_borrowed_value(json)
1243 }
1244
1245 pub fn parse_str<'a>(
1247 &mut self,
1248 json: &'a mut str,
1249 ) -> Result<simd_json::BorrowedValue<'a>, simd_json::Error> {
1250 let bytes = unsafe { json.as_bytes_mut() };
1251 simd_json::to_borrowed_value(bytes)
1252 }
1253
1254 pub fn parse_owned(
1256 &mut self,
1257 json: &mut [u8],
1258 ) -> Result<simd_json::OwnedValue, simd_json::Error> {
1259 simd_json::to_owned_value(json)
1260 }
1261
1262 pub fn parse_json(&self, json: &[u8]) -> Result<serde_json::Value, serde_json::Error> {
1264 serde_json::from_slice(json)
1265 }
1266}
1267
1268impl Default for SimdJsonProcessor {
1269 fn default() -> Self {
1270 Self::new()
1271 }
1272}