oxirs_core/
optimization.rs

1//! Zero-copy operations and performance optimizations
2//!
3//! This module provides advanced performance optimizations including zero-copy
4//! operations, memory-efficient data structures, and SIMD-accelerated processing
5//! for RDF data manipulation.
6
7use crate::interning::{InternedString, StringInterner};
8use crate::model::*;
9use bumpalo::Bump;
10use crossbeam::epoch::{self, Atomic, Owned};
11use crossbeam::queue::SegQueue;
12use dashmap::DashMap;
13use parking_lot::RwLock;
14#[cfg(feature = "parallel")]
15use rayon::iter::IntoParallelRefIterator;
16#[cfg(feature = "parallel")]
17use rayon::iter::ParallelIterator;
18use simd_json;
19use std::collections::BTreeSet;
20use std::pin::Pin;
21use std::sync::atomic::Ordering;
22use std::sync::Arc;
23
24/// Type alias for string interner used throughout optimization module
25pub type TermInterner = StringInterner;
26
27/// Extension trait for TermInterner to create RDF terms
28pub trait TermInternerExt {
29    /// Intern a named node and return it
30    fn intern_named_node(&self, iri: &str) -> Result<NamedNode, crate::OxirsError>;
31
32    /// Create and intern a new blank node
33    fn intern_blank_node(&self) -> BlankNode;
34
35    /// Intern a simple literal
36    fn intern_literal(&self, value: &str) -> Result<Literal, crate::OxirsError>;
37
38    /// Intern a literal with datatype
39    fn intern_literal_with_datatype(
40        &self,
41        value: &str,
42        datatype_iri: &str,
43    ) -> Result<Literal, crate::OxirsError>;
44
45    /// Intern a literal with language tag
46    fn intern_literal_with_language(
47        &self,
48        value: &str,
49        language: &str,
50    ) -> Result<Literal, crate::OxirsError>;
51}
52
53impl TermInternerExt for TermInterner {
54    fn intern_named_node(&self, iri: &str) -> Result<NamedNode, crate::OxirsError> {
55        // Intern the IRI string
56        let interned = self.intern(iri);
57        // Create NamedNode from the interned string
58        NamedNode::new(interned.as_ref())
59    }
60
61    fn intern_blank_node(&self) -> BlankNode {
62        // Generate a unique blank node
63        BlankNode::new_unique()
64    }
65
66    fn intern_literal(&self, value: &str) -> Result<Literal, crate::OxirsError> {
67        // Intern the literal value
68        let interned = self.intern(value);
69        // Create simple literal
70        Ok(Literal::new_simple_literal(interned.as_ref()))
71    }
72
73    fn intern_literal_with_datatype(
74        &self,
75        value: &str,
76        datatype_iri: &str,
77    ) -> Result<Literal, crate::OxirsError> {
78        // Intern both value and datatype IRI
79        let value_interned = self.intern(value);
80        let datatype_interned = self.intern(datatype_iri);
81        // Create datatype node and literal
82        let datatype_node = NamedNode::new(datatype_interned.as_ref())?;
83        Ok(Literal::new_typed_literal(
84            value_interned.as_ref(),
85            datatype_node,
86        ))
87    }
88
89    fn intern_literal_with_language(
90        &self,
91        value: &str,
92        language: &str,
93    ) -> Result<Literal, crate::OxirsError> {
94        // Intern both value and language tag
95        let value_interned = self.intern(value);
96        let language_interned = self.intern(language);
97        // Create language-tagged literal
98        let literal = Literal::new_language_tagged_literal(
99            value_interned.as_ref(),
100            language_interned.as_ref(),
101        )?;
102        Ok(literal)
103    }
104}
105
106/// Arena-based memory allocator for RDF terms
107///
108/// Provides fast allocation and automatic cleanup for temporary RDF operations
109#[derive(Debug)]
110pub struct RdfArena {
111    /// Main allocation arena (wrapped in Mutex for thread safety)
112    arena: std::sync::Mutex<Bump>,
113    /// String interner for the arena
114    interner: StringInterner,
115    /// Statistics
116    allocated_bytes: std::sync::atomic::AtomicUsize,
117    allocation_count: std::sync::atomic::AtomicUsize,
118}
119
120impl RdfArena {
121    /// Create a new RDF arena with the given capacity hint
122    pub fn new() -> Self {
123        RdfArena {
124            arena: std::sync::Mutex::new(Bump::new()),
125            interner: StringInterner::new(),
126            allocated_bytes: std::sync::atomic::AtomicUsize::new(0),
127            allocation_count: std::sync::atomic::AtomicUsize::new(0),
128        }
129    }
130
131    /// Create a new arena with pre-allocated capacity
132    pub fn with_capacity(capacity: usize) -> Self {
133        RdfArena {
134            arena: std::sync::Mutex::new(Bump::with_capacity(capacity)),
135            interner: StringInterner::new(),
136            allocated_bytes: std::sync::atomic::AtomicUsize::new(0),
137            allocation_count: std::sync::atomic::AtomicUsize::new(0),
138        }
139    }
140
141    /// Allocate a string in the arena
142    pub fn alloc_str(&self, s: &str) -> String {
143        // Since we can't return a reference with Mutex, return an owned String
144        // For ultra-performance mode, the caller should use string interning instead
145        self.allocated_bytes
146            .fetch_add(s.len(), std::sync::atomic::Ordering::Relaxed);
147        self.allocation_count
148            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
149        s.to_string()
150    }
151
152    /// Allocate and intern a string for efficient reuse
153    pub fn intern_str(&self, s: &str) -> InternedString {
154        InternedString::new_with_interner(s, &self.interner)
155    }
156
157    /// Reset the arena, freeing all allocated memory
158    pub fn reset(&self) {
159        if let Ok(mut arena) = self.arena.lock() {
160            arena.reset();
161            self.allocated_bytes
162                .store(0, std::sync::atomic::Ordering::Relaxed);
163            self.allocation_count
164                .store(0, std::sync::atomic::Ordering::Relaxed);
165        }
166    }
167
168    /// Get total bytes allocated
169    pub fn allocated_bytes(&self) -> usize {
170        self.allocated_bytes
171            .load(std::sync::atomic::Ordering::Relaxed)
172    }
173
174    /// Get total allocation count
175    pub fn allocation_count(&self) -> usize {
176        self.allocation_count
177            .load(std::sync::atomic::Ordering::Relaxed)
178    }
179}
180
181impl Default for RdfArena {
182    fn default() -> Self {
183        Self::new()
184    }
185}
186
187/// Zero-copy RDF term reference that avoids allocations
188///
189/// This provides efficient operations on RDF terms without copying data
190#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
191pub enum TermRef<'a> {
192    NamedNode(&'a str),
193    BlankNode(&'a str),
194    Literal(&'a str, Option<&'a str>, Option<&'a str>), // value, datatype, language
195    Variable(&'a str),
196}
197
198impl<'a> TermRef<'a> {
199    /// Create a term reference from a named node
200    pub fn from_named_node(node: &'a NamedNode) -> Self {
201        TermRef::NamedNode(node.as_str())
202    }
203
204    /// Create a term reference from a blank node
205    pub fn from_blank_node(node: &'a BlankNode) -> Self {
206        TermRef::BlankNode(node.as_str())
207    }
208
209    /// Create a term reference from a literal
210    pub fn from_literal(literal: &'a Literal) -> Self {
211        let language = literal.language();
212        // Always include datatype IRI for now to avoid lifetime issues
213        // Skip datatype for now due to lifetime issues - would need redesign
214        TermRef::Literal(literal.value(), None, language)
215    }
216
217    /// Get the string representation of this term
218    pub fn as_str(&self) -> &'a str {
219        match self {
220            TermRef::NamedNode(s) => s,
221            TermRef::BlankNode(s) => s,
222            TermRef::Literal(s, _, _) => s,
223            TermRef::Variable(s) => s,
224        }
225    }
226
227    /// Convert to an owned Term (allocating if necessary)
228    pub fn to_owned(&self) -> Result<Term, crate::OxirsError> {
229        match self {
230            TermRef::NamedNode(iri) => NamedNode::new(*iri).map(Term::NamedNode),
231            TermRef::BlankNode(id) => BlankNode::new(*id).map(Term::BlankNode),
232            TermRef::Literal(value, datatype, language) => {
233                let literal = if let Some(lang) = language {
234                    Literal::new_lang(*value, *lang)?
235                } else if let Some(dt) = datatype {
236                    let dt_node = NamedNode::new(*dt)?;
237                    Literal::new_typed(*value, dt_node)
238                } else {
239                    Literal::new(*value)
240                };
241                Ok(Term::Literal(literal))
242            }
243            TermRef::Variable(name) => Variable::new(*name).map(Term::Variable),
244        }
245    }
246
247    /// Returns true if this is a named node
248    pub fn is_named_node(&self) -> bool {
249        matches!(self, TermRef::NamedNode(_))
250    }
251
252    /// Returns true if this is a blank node
253    pub fn is_blank_node(&self) -> bool {
254        matches!(self, TermRef::BlankNode(_))
255    }
256
257    /// Returns true if this is a literal
258    pub fn is_literal(&self) -> bool {
259        matches!(self, TermRef::Literal(_, _, _))
260    }
261
262    /// Returns true if this is a variable
263    pub fn is_variable(&self) -> bool {
264        matches!(self, TermRef::Variable(_))
265    }
266}
267
268impl<'a> std::fmt::Display for TermRef<'a> {
269    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
270        match self {
271            TermRef::NamedNode(iri) => write!(f, "<{iri}>"),
272            TermRef::BlankNode(id) => write!(f, "{id}"),
273            TermRef::Literal(value, datatype, language) => {
274                write!(f, "\"{value}\"")?;
275                if let Some(lang) = language {
276                    write!(f, "@{lang}")?;
277                } else if let Some(dt) = datatype {
278                    write!(f, "^^<{dt}>")?;
279                }
280                Ok(())
281            }
282            TermRef::Variable(name) => write!(f, "?{name}"),
283        }
284    }
285}
286
287/// Zero-copy triple reference for efficient operations
288#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
289pub struct TripleRef<'a> {
290    pub subject: TermRef<'a>,
291    pub predicate: TermRef<'a>,
292    pub object: TermRef<'a>,
293}
294
295impl<'a> TripleRef<'a> {
296    /// Create a new triple reference
297    pub fn new(subject: TermRef<'a>, predicate: TermRef<'a>, object: TermRef<'a>) -> Self {
298        TripleRef {
299            subject,
300            predicate,
301            object,
302        }
303    }
304
305    /// Create from an owned triple
306    pub fn from_triple(triple: &'a Triple) -> Self {
307        TripleRef {
308            subject: match triple.subject() {
309                Subject::NamedNode(n) => TermRef::NamedNode(n.as_str()),
310                Subject::BlankNode(b) => TermRef::BlankNode(b.as_str()),
311                Subject::Variable(v) => TermRef::Variable(v.as_str()),
312                Subject::QuotedTriple(_) => TermRef::NamedNode("<<quoted-triple>>"),
313            },
314            predicate: match triple.predicate() {
315                Predicate::NamedNode(n) => TermRef::NamedNode(n.as_str()),
316                Predicate::Variable(v) => TermRef::Variable(v.as_str()),
317            },
318            object: match triple.object() {
319                Object::NamedNode(n) => TermRef::NamedNode(n.as_str()),
320                Object::BlankNode(b) => TermRef::BlankNode(b.as_str()),
321                Object::Literal(l) => TermRef::from_literal(l),
322                Object::Variable(v) => TermRef::Variable(v.as_str()),
323                Object::QuotedTriple(_) => TermRef::NamedNode("<<quoted-triple>>"),
324            },
325        }
326    }
327
328    /// Convert to an owned triple
329    pub fn to_owned(&self) -> Result<Triple, crate::OxirsError> {
330        let subject = match self.subject.to_owned()? {
331            Term::NamedNode(n) => Subject::NamedNode(n),
332            Term::BlankNode(b) => Subject::BlankNode(b),
333            _ => return Err(crate::OxirsError::Parse("Invalid subject term".to_string())),
334        };
335
336        let predicate = match self.predicate.to_owned()? {
337            Term::NamedNode(n) => Predicate::NamedNode(n),
338            _ => {
339                return Err(crate::OxirsError::Parse(
340                    "Invalid predicate term".to_string(),
341                ))
342            }
343        };
344
345        let object = match self.object.to_owned()? {
346            Term::NamedNode(n) => Object::NamedNode(n),
347            Term::BlankNode(b) => Object::BlankNode(b),
348            Term::Literal(l) => Object::Literal(l),
349            _ => return Err(crate::OxirsError::Parse("Invalid object term".to_string())),
350        };
351
352        Ok(Triple::new(subject, predicate, object))
353    }
354}
355
356impl<'a> std::fmt::Display for TripleRef<'a> {
357    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
358        write!(f, "{} {} {} .", self.subject, self.predicate, self.object)
359    }
360}
361
362/// Lock-free graph operations using epoch-based memory management
363#[derive(Debug)]
364pub struct LockFreeGraph {
365    /// Atomic pointer to the current graph data
366    data: Atomic<GraphData>,
367    /// Epoch for safe memory reclamation
368    epoch: epoch::Guard,
369}
370
371/// Internal graph data structure for lock-free operations
372#[derive(Debug)]
373struct GraphData {
374    /// Triples stored in a B-tree for ordered access
375    triples: BTreeSet<Triple>,
376    /// Version number for optimistic updates
377    version: u64,
378}
379
380impl LockFreeGraph {
381    /// Create a new lock-free graph
382    pub fn new() -> Self {
383        let initial_data = GraphData {
384            triples: BTreeSet::new(),
385            version: 0,
386        };
387
388        LockFreeGraph {
389            data: Atomic::new(initial_data),
390            epoch: epoch::pin(),
391        }
392    }
393
394    /// Insert a triple using compare-and-swap
395    pub fn insert(&self, triple: Triple) -> bool {
396        loop {
397            let current = self.data.load(Ordering::Acquire, &self.epoch);
398            let current_ref = unsafe { current.deref() };
399
400            // Check if triple already exists
401            if current_ref.triples.contains(&triple) {
402                return false;
403            }
404
405            // Create new data with the inserted triple
406            let mut new_triples = current_ref.triples.clone();
407            new_triples.insert(triple.clone());
408
409            let new_data = GraphData {
410                triples: new_triples,
411                version: current_ref.version + 1,
412            };
413
414            // Try to update atomically
415            match self.data.compare_exchange_weak(
416                current,
417                Owned::new(new_data),
418                Ordering::Release,
419                Ordering::Relaxed,
420                &self.epoch,
421            ) {
422                Ok(_) => {
423                    // Successfully updated
424                    unsafe {
425                        self.epoch.defer_destroy(current);
426                    }
427                    return true;
428                }
429                Err(_) => {
430                    // Retry with new current value
431                    continue;
432                }
433            }
434        }
435    }
436
437    /// Get the current number of triples
438    pub fn len(&self) -> usize {
439        let current = self.data.load(Ordering::Acquire, &self.epoch);
440        unsafe { current.deref().triples.len() }
441    }
442
443    /// Check if the graph is empty
444    pub fn is_empty(&self) -> bool {
445        self.len() == 0
446    }
447
448    /// Check if a triple exists
449    pub fn contains(&self, triple: &Triple) -> bool {
450        let current = self.data.load(Ordering::Acquire, &self.epoch);
451        unsafe { current.deref().triples.contains(triple) }
452    }
453}
454
455impl Default for LockFreeGraph {
456    fn default() -> Self {
457        Self::new()
458    }
459}
460
461/// High-performance graph with multiple indexing strategies
462#[derive(Debug)]
463pub struct OptimizedGraph {
464    /// Subject-Predicate-Object index
465    spo: DashMap<InternedString, DashMap<InternedString, BTreeSet<InternedString>>>,
466    /// Predicate-Object-Subject index
467    pos: DashMap<InternedString, DashMap<InternedString, BTreeSet<InternedString>>>,
468    /// Object-Subject-Predicate index
469    osp: DashMap<InternedString, DashMap<InternedString, BTreeSet<InternedString>>>,
470    /// String interner for memory efficiency
471    interner: Arc<StringInterner>,
472    /// Statistics
473    stats: Arc<RwLock<GraphStats>>,
474}
475
476/// Statistics for the optimized graph
477#[derive(Debug, Clone, Default)]
478pub struct GraphStats {
479    pub triple_count: usize,
480    pub unique_subjects: usize,
481    pub unique_predicates: usize,
482    pub unique_objects: usize,
483    pub index_memory_usage: usize,
484    pub intern_hit_ratio: f64,
485}
486
487impl OptimizedGraph {
488    /// Create a new optimized graph
489    pub fn new() -> Self {
490        OptimizedGraph {
491            spo: DashMap::new(),
492            pos: DashMap::new(),
493            osp: DashMap::new(),
494            interner: Arc::new(StringInterner::new()),
495            stats: Arc::new(RwLock::new(GraphStats::default())),
496        }
497    }
498
499    /// Insert a triple into all indexes
500    pub fn insert(&self, triple: &Triple) -> bool {
501        let subject = self.intern_subject(triple.subject());
502        let predicate = self.intern_predicate(triple.predicate());
503        let object = self.intern_object(triple.object());
504
505        // Insert into SPO index
506        let spo_entry = self.spo.entry(subject.clone()).or_default();
507        let mut po_entry = spo_entry.entry(predicate.clone()).or_default();
508        let was_new = po_entry.insert(object.clone());
509
510        if was_new {
511            // Insert into POS index
512            let pos_entry = self.pos.entry(predicate.clone()).or_default();
513            let mut os_entry = pos_entry.entry(object.clone()).or_default();
514            os_entry.insert(subject.clone());
515
516            // Insert into OSP index
517            let osp_entry = self.osp.entry(object.clone()).or_default();
518            let mut sp_entry = osp_entry.entry(subject.clone()).or_default();
519            sp_entry.insert(predicate);
520
521            // Update statistics
522            {
523                let mut stats = self.stats.write();
524                stats.triple_count += 1;
525                stats.intern_hit_ratio = self.interner.stats().hit_ratio();
526            }
527        }
528
529        was_new
530    }
531
532    /// Query triples by pattern (None = wildcard)
533    pub fn query(
534        &self,
535        subject: Option<&Subject>,
536        predicate: Option<&Predicate>,
537        object: Option<&Object>,
538    ) -> Vec<Triple> {
539        let mut results = Vec::new();
540
541        // Choose the most selective index based on bound variables
542        match (subject.is_some(), predicate.is_some(), object.is_some()) {
543            (true, true, true) => {
544                // Exact match - use SPO index
545                if let (Some(s), Some(p), Some(o)) = (subject, predicate, object) {
546                    let s_intern = self.intern_subject(s);
547                    let p_intern = self.intern_predicate(p);
548                    let o_intern = self.intern_object(o);
549
550                    if let Some(po_map) = self.spo.get(&s_intern) {
551                        if let Some(o_set) = po_map.get(&p_intern) {
552                            if o_set.contains(&o_intern) {
553                                let triple = Triple::new(s.clone(), p.clone(), o.clone());
554                                results.push(triple);
555                            }
556                        }
557                    }
558                }
559            }
560            (true, true, false) => {
561                // Subject and predicate bound - use SPO index
562                if let (Some(s), Some(p)) = (subject, predicate) {
563                    let s_intern = self.intern_subject(s);
564                    let p_intern = self.intern_predicate(p);
565
566                    if let Some(po_map) = self.spo.get(&s_intern) {
567                        if let Some(o_set) = po_map.get(&p_intern) {
568                            for o_intern in o_set.iter() {
569                                if let Ok(object) = self.unintern_object(o_intern) {
570                                    let triple = Triple::new(s.clone(), p.clone(), object);
571                                    results.push(triple);
572                                }
573                            }
574                        }
575                    }
576                }
577            }
578            (false, true, true) => {
579                // Predicate and object bound - use POS index
580                if let (Some(p), Some(o)) = (predicate, object) {
581                    let p_intern = self.intern_predicate(p);
582                    let o_intern = self.intern_object(o);
583
584                    if let Some(os_map) = self.pos.get(&p_intern) {
585                        if let Some(s_set) = os_map.get(&o_intern) {
586                            for s_intern in s_set.iter() {
587                                if let Ok(subject) = self.unintern_subject(s_intern) {
588                                    let triple = Triple::new(subject, p.clone(), o.clone());
589                                    results.push(triple);
590                                }
591                            }
592                        }
593                    }
594                }
595            }
596            _ => {
597                // Other patterns - full scan (could be optimized further)
598                for s_entry in &self.spo {
599                    let s_intern = s_entry.key();
600                    if let Ok(s) = self.unintern_subject(s_intern) {
601                        if subject.is_some() && subject.unwrap() != &s {
602                            continue;
603                        }
604
605                        for po_entry in s_entry.value().iter() {
606                            let p_intern = po_entry.key();
607                            if let Ok(p) = self.unintern_predicate(p_intern) {
608                                if predicate.is_some() && predicate.unwrap() != &p {
609                                    continue;
610                                }
611
612                                for o_intern in po_entry.value().iter() {
613                                    if let Ok(o) = self.unintern_object(o_intern) {
614                                        if object.is_some() && object.unwrap() != &o {
615                                            continue;
616                                        }
617
618                                        let triple = Triple::new(s.clone(), p.clone(), o);
619                                        results.push(triple);
620                                    }
621                                }
622                            }
623                        }
624                    }
625                }
626            }
627        }
628
629        results
630    }
631
632    /// Get current statistics
633    pub fn stats(&self) -> GraphStats {
634        self.stats.read().clone()
635    }
636
637    /// Intern a subject term
638    fn intern_subject(&self, subject: &Subject) -> InternedString {
639        match subject {
640            Subject::NamedNode(n) => InternedString::new_with_interner(n.as_str(), &self.interner),
641            Subject::BlankNode(b) => InternedString::new_with_interner(b.as_str(), &self.interner),
642            Subject::Variable(v) => InternedString::new_with_interner(v.as_str(), &self.interner),
643            Subject::QuotedTriple(_) => {
644                InternedString::new_with_interner("<<quoted-triple>>", &self.interner)
645            }
646        }
647    }
648
649    /// Intern a predicate term
650    fn intern_predicate(&self, predicate: &Predicate) -> InternedString {
651        match predicate {
652            Predicate::NamedNode(n) => {
653                InternedString::new_with_interner(n.as_str(), &self.interner)
654            }
655            Predicate::Variable(v) => InternedString::new_with_interner(v.as_str(), &self.interner),
656        }
657    }
658
659    /// Intern an object term
660    fn intern_object(&self, object: &Object) -> InternedString {
661        match object {
662            Object::NamedNode(n) => InternedString::new_with_interner(n.as_str(), &self.interner),
663            Object::BlankNode(b) => InternedString::new_with_interner(b.as_str(), &self.interner),
664            Object::Literal(l) => {
665                // For literals, we store a serialized representation
666                let serialized = format!("{l}");
667                InternedString::new_with_interner(&serialized, &self.interner)
668            }
669            Object::Variable(v) => InternedString::new_with_interner(v.as_str(), &self.interner),
670            Object::QuotedTriple(_) => {
671                InternedString::new_with_interner("<<quoted-triple>>", &self.interner)
672            }
673        }
674    }
675
676    /// Convert interned subject back to Subject
677    fn unintern_subject(&self, interned: &InternedString) -> Result<Subject, crate::OxirsError> {
678        let s = interned.as_str();
679        if s.starts_with("?") || s.starts_with("$") {
680            Variable::new(&s[1..]).map(Subject::Variable)
681        } else if s.starts_with("_:") {
682            BlankNode::new(s).map(Subject::BlankNode)
683        } else {
684            NamedNode::new(s).map(Subject::NamedNode)
685        }
686    }
687
688    /// Convert interned predicate back to Predicate
689    fn unintern_predicate(
690        &self,
691        interned: &InternedString,
692    ) -> Result<Predicate, crate::OxirsError> {
693        let s = interned.as_str();
694        if s.starts_with("?") || s.starts_with("$") {
695            Variable::new(&s[1..]).map(Predicate::Variable)
696        } else {
697            NamedNode::new(s).map(Predicate::NamedNode)
698        }
699    }
700
701    /// Convert interned object back to Object
702    fn unintern_object(&self, interned: &InternedString) -> Result<Object, crate::OxirsError> {
703        let s = interned.as_str();
704        if s.starts_with("?") || s.starts_with("$") {
705            return Variable::new(&s[1..]).map(Object::Variable);
706        } else if let Some(stripped) = s.strip_prefix("\"") {
707            // Parse literal (simplified - would need full parser for production)
708            if let Some(end_quote) = stripped.find('"') {
709                let value = &stripped[..end_quote];
710                return Ok(Object::Literal(Literal::new(value)));
711            }
712            // If no end quote found, treat as a simple literal
713            return Ok(Object::Literal(Literal::new(s)));
714        }
715
716        if s.starts_with("_:") {
717            BlankNode::new(s).map(Object::BlankNode)
718        } else {
719            NamedNode::new(s).map(Object::NamedNode)
720        }
721    }
722}
723
724impl Default for OptimizedGraph {
725    fn default() -> Self {
726        Self::new()
727    }
728}
729
730/// Lock-free queue for batch processing operations
731#[cfg(feature = "parallel")]
732#[derive(Debug)]
733pub struct BatchProcessor {
734    /// Queue for pending operations
735    operation_queue: SegQueue<BatchOperation>,
736    /// Background processing threads
737    processing_pool: rayon::ThreadPool,
738    /// Statistics
739    stats: Arc<RwLock<BatchStats>>,
740}
741
742/// Batch operation types
743#[derive(Debug, Clone)]
744pub enum BatchOperation {
745    Insert(Quad),
746    Delete(Quad),
747    Update { old: Quad, new: Quad },
748    Compact,
749}
750
751/// Batch processing statistics
752#[derive(Debug, Default, Clone)]
753pub struct BatchStats {
754    pub operations_processed: usize,
755    pub batch_size: usize,
756    pub processing_time_ms: u64,
757    pub throughput_ops_per_sec: f64,
758}
759
760#[cfg(feature = "parallel")]
761impl BatchProcessor {
762    /// Create a new batch processor with specified thread count
763    pub fn new(num_threads: usize) -> Self {
764        let pool = rayon::ThreadPoolBuilder::new()
765            .num_threads(num_threads)
766            .build()
767            .unwrap();
768
769        BatchProcessor {
770            operation_queue: SegQueue::new(),
771            processing_pool: pool,
772            stats: Arc::new(RwLock::new(BatchStats::default())),
773        }
774    }
775
776    /// Add an operation to the batch queue
777    pub fn push(&self, operation: BatchOperation) {
778        self.operation_queue.push(operation);
779    }
780
781    /// Process all pending operations in batches
782    pub fn process_batch(&self, batch_size: usize) -> Result<usize, crate::OxirsError> {
783        let start_time = std::time::Instant::now();
784        let mut operations = Vec::with_capacity(batch_size);
785
786        // Collect operations from queue
787        for _ in 0..batch_size {
788            if let Some(op) = self.operation_queue.pop() {
789                operations.push(op);
790            } else {
791                break;
792            }
793        }
794
795        if operations.is_empty() {
796            return Ok(0);
797        }
798
799        let operations_count = operations.len();
800
801        // Process operations in parallel using Rayon
802        self.processing_pool.install(|| {
803            operations.par_iter().for_each(|operation| {
804                match operation {
805                    BatchOperation::Insert(_quad) => {
806                        // Parallel insert logic would go here
807                    }
808                    BatchOperation::Delete(_quad) => {
809                        // Parallel delete logic would go here
810                    }
811                    BatchOperation::Update {
812                        old: _old,
813                        new: _new,
814                    } => {
815                        // Parallel update logic would go here
816                    }
817                    BatchOperation::Compact => {
818                        // Compaction logic would go here
819                    }
820                }
821            });
822        });
823
824        // Update statistics
825        let processing_time = start_time.elapsed();
826        {
827            let mut stats = self.stats.write();
828            stats.operations_processed += operations_count;
829            stats.batch_size = batch_size;
830            stats.processing_time_ms = processing_time.as_millis() as u64;
831            if processing_time.as_secs_f64() > 0.0 {
832                stats.throughput_ops_per_sec =
833                    operations_count as f64 / processing_time.as_secs_f64();
834            }
835        }
836
837        Ok(operations_count)
838    }
839
840    /// Get current processing statistics
841    pub fn stats(&self) -> BatchStats {
842        self.stats.read().clone()
843    }
844
845    /// Get the number of pending operations
846    pub fn pending_operations(&self) -> usize {
847        self.operation_queue.len()
848    }
849}
850
851#[cfg(feature = "parallel")]
852impl Default for BatchProcessor {
853    fn default() -> Self {
854        Self::new(num_cpus::get())
855    }
856}
857
858/// SIMD-accelerated string operations for RDF processing
859pub mod simd {
860    #[cfg(feature = "simd")]
861    use wide::{u8x32, CmpEq};
862
863    /// Fast IRI validation using SIMD operations
864    #[cfg(feature = "simd")]
865    pub fn validate_iri_fast(iri: &str) -> bool {
866        if iri.is_empty() {
867            return false;
868        }
869
870        let bytes = iri.as_bytes();
871        let len = bytes.len();
872
873        // Process 32 bytes at a time using SIMD
874        let chunks = len / 32;
875        let _remainder = len % 32;
876
877        for i in 0..chunks {
878            let start = i * 32;
879            let chunk = &bytes[start..start + 32];
880
881            // Load 32 bytes
882            let data = u8x32::from([
883                chunk[0], chunk[1], chunk[2], chunk[3], chunk[4], chunk[5], chunk[6], chunk[7],
884                chunk[8], chunk[9], chunk[10], chunk[11], chunk[12], chunk[13], chunk[14],
885                chunk[15], chunk[16], chunk[17], chunk[18], chunk[19], chunk[20], chunk[21],
886                chunk[22], chunk[23], chunk[24], chunk[25], chunk[26], chunk[27], chunk[28],
887                chunk[29], chunk[30], chunk[31],
888            ]);
889
890            // Check for forbidden characters (< > " { } | \ ^ ` space)
891            let forbidden_chars = [b'<', b'>', b'"', b'{', b'}', b'|', b'\\', b'^', b'`', b' '];
892
893            for &forbidden in &forbidden_chars {
894                let forbidden_vec = u8x32::splat(forbidden);
895                let matches = data.cmp_eq(forbidden_vec);
896                if matches.any() {
897                    return false;
898                }
899            }
900
901            // Check for control characters (0-31, 127-159)
902            for &byte in chunk {
903                if matches!(byte, 0..=31 | 127..=159) {
904                    return false;
905                }
906            }
907        }
908
909        // Process remaining bytes
910        for &byte in &bytes[chunks * 32..] {
911            if matches!(byte,
912                0..=31 | 127..=159 | // Control characters
913                b'<' | b'>' | b'"' | b'{' | b'}' | b'|' | b'\\' | b'^' | b'`' | b' ' // Forbidden
914            ) {
915                return false;
916            }
917        }
918
919        true
920    }
921
922    /// Fast IRI validation (non-SIMD fallback)
923    #[cfg(not(feature = "simd"))]
924    pub fn validate_iri_fast(iri: &str) -> bool {
925        if iri.is_empty() {
926            return false;
927        }
928
929        for byte in iri.bytes() {
930            if matches!(
931                byte,
932                b'<' | b'>' | b'"' | b'{' | b'}' | b'|' | b'\\' | b'^' | b'`' | b' ' // Forbidden
933            ) {
934                return false;
935            }
936        }
937
938        true
939    }
940
941    /// Fast string comparison using SIMD
942    pub fn compare_strings_fast(a: &str, b: &str) -> std::cmp::Ordering {
943        if a.len() != b.len() {
944            return a.len().cmp(&b.len());
945        }
946
947        let a_bytes = a.as_bytes();
948        let b_bytes = b.as_bytes();
949        let len = a_bytes.len();
950
951        // Process 32 bytes at a time
952        let chunks = len / 32;
953
954        for i in 0..chunks {
955            let start = i * 32;
956            let a_chunk = &a_bytes[start..start + 32];
957            let b_chunk = &b_bytes[start..start + 32];
958
959            // Compare chunks bytewise
960            for j in 0..32 {
961                match a_chunk[j].cmp(&b_chunk[j]) {
962                    std::cmp::Ordering::Equal => continue,
963                    other => return other,
964                }
965            }
966        }
967
968        // Process remaining bytes
969        for i in chunks * 32..len {
970            match a_bytes[i].cmp(&b_bytes[i]) {
971                std::cmp::Ordering::Equal => continue,
972                other => return other,
973            }
974        }
975
976        std::cmp::Ordering::Equal
977    }
978}
979
980#[cfg(test)]
981mod tests {
982    use super::*;
983
984    #[test]
985    fn test_rdf_arena() {
986        let arena = RdfArena::new();
987
988        let s1 = arena.alloc_str("test string 1");
989        let s2 = arena.alloc_str("test string 2");
990
991        assert_eq!(s1, "test string 1");
992        assert_eq!(s2, "test string 2");
993        assert!(arena.allocated_bytes() > 0);
994        assert_eq!(arena.allocation_count(), 2);
995    }
996
997    #[test]
998    fn test_term_ref() {
999        let node = NamedNode::new("http://example.org/test").unwrap();
1000        let term_ref = TermRef::from_named_node(&node);
1001
1002        assert!(term_ref.is_named_node());
1003        assert_eq!(term_ref.as_str(), "http://example.org/test");
1004
1005        let owned = term_ref.to_owned().unwrap();
1006        assert!(owned.is_named_node());
1007    }
1008
1009    #[test]
1010    fn test_triple_ref() {
1011        let subject = NamedNode::new("http://example.org/s").unwrap();
1012        let predicate = NamedNode::new("http://example.org/p").unwrap();
1013        let object = Literal::new("test object");
1014        let triple = Triple::new(subject, predicate, object);
1015
1016        let triple_ref = TripleRef::from_triple(&triple);
1017        assert!(triple_ref.subject.is_named_node());
1018        assert!(triple_ref.predicate.is_named_node());
1019        assert!(triple_ref.object.is_literal());
1020
1021        let owned = triple_ref.to_owned().unwrap();
1022        assert_eq!(owned, triple);
1023    }
1024
1025    #[test]
1026    fn test_lock_free_graph() {
1027        let graph = LockFreeGraph::new();
1028        assert!(graph.is_empty());
1029
1030        let subject = NamedNode::new("http://example.org/s").unwrap();
1031        let predicate = NamedNode::new("http://example.org/p").unwrap();
1032        let object = Literal::new("test object");
1033        let triple = Triple::new(subject, predicate, object);
1034
1035        assert!(graph.insert(triple.clone()));
1036        assert!(!graph.insert(triple.clone())); // Duplicate
1037        assert_eq!(graph.len(), 1);
1038        assert!(graph.contains(&triple));
1039    }
1040
1041    #[test]
1042    fn test_optimized_graph() {
1043        let graph = OptimizedGraph::new();
1044
1045        let subject = NamedNode::new("http://example.org/s").unwrap();
1046        let predicate = NamedNode::new("http://example.org/p").unwrap();
1047        let object = Literal::new("test object");
1048        let triple = Triple::new(subject.clone(), predicate.clone(), object.clone());
1049
1050        assert!(graph.insert(&triple));
1051        assert!(!graph.insert(&triple)); // Duplicate
1052
1053        // Query by exact match
1054        let results = graph.query(
1055            Some(&Subject::NamedNode(subject.clone())),
1056            Some(&Predicate::NamedNode(predicate.clone())),
1057            Some(&Object::Literal(object.clone())),
1058        );
1059        assert_eq!(results.len(), 1);
1060        assert_eq!(results[0], triple);
1061
1062        // Query by subject only
1063        let results = graph.query(Some(&Subject::NamedNode(subject)), None, None);
1064        assert_eq!(results.len(), 1);
1065
1066        let stats = graph.stats();
1067        assert_eq!(stats.triple_count, 1);
1068    }
1069
1070    #[test]
1071    fn test_simd_iri_validation() {
1072        assert!(simd::validate_iri_fast("http://example.org/test"));
1073        assert!(!simd::validate_iri_fast("http://example.org/<invalid>"));
1074        assert!(!simd::validate_iri_fast(""));
1075        assert!(!simd::validate_iri_fast(
1076            "http://example.org/test with spaces"
1077        ));
1078    }
1079
1080    #[test]
1081    fn test_simd_string_comparison() {
1082        assert_eq!(
1083            simd::compare_strings_fast("abc", "abc"),
1084            std::cmp::Ordering::Equal
1085        );
1086        assert_eq!(
1087            simd::compare_strings_fast("abc", "def"),
1088            std::cmp::Ordering::Less
1089        );
1090        assert_eq!(
1091            simd::compare_strings_fast("def", "abc"),
1092            std::cmp::Ordering::Greater
1093        );
1094        assert_eq!(
1095            simd::compare_strings_fast("short", "longer"),
1096            std::cmp::Ordering::Less
1097        );
1098    }
1099
1100    #[test]
1101    fn test_arena_reset() {
1102        let arena = RdfArena::new();
1103
1104        arena.alloc_str("test");
1105        assert!(arena.allocated_bytes() > 0);
1106
1107        arena.reset();
1108        assert_eq!(arena.allocated_bytes(), 0);
1109        assert_eq!(arena.allocation_count(), 0);
1110    }
1111
1112    #[test]
1113    fn test_concurrent_optimized_graph() {
1114        use std::sync::Arc;
1115        use std::thread;
1116
1117        let graph = Arc::new(OptimizedGraph::new());
1118        let handles: Vec<_> = (0..10)
1119            .map(|i| {
1120                let graph = Arc::clone(&graph);
1121                thread::spawn(move || {
1122                    let subject = NamedNode::new(format!("http://example.org/s{i}")).unwrap();
1123                    let predicate = NamedNode::new("http://example.org/p").unwrap();
1124                    let object = Literal::new(format!("object{i}"));
1125                    let triple = Triple::new(subject, predicate, object);
1126
1127                    graph.insert(&triple)
1128                })
1129            })
1130            .collect();
1131
1132        let results: Vec<bool> = handles.into_iter().map(|h| h.join().unwrap()).collect();
1133        assert!(results.iter().all(|&inserted| inserted));
1134
1135        let stats = graph.stats();
1136        assert_eq!(stats.triple_count, 10);
1137    }
1138}
1139
1140/// Zero-copy buffer for efficient data manipulation
1141pub struct ZeroCopyBuffer {
1142    data: Pin<Box<[u8]>>,
1143    len: usize,
1144}
1145
1146impl ZeroCopyBuffer {
1147    /// Create a new zero-copy buffer with the given capacity
1148    pub fn new(capacity: usize) -> Self {
1149        Self::with_capacity(capacity)
1150    }
1151
1152    /// Create a new zero-copy buffer with the given capacity
1153    pub fn with_capacity(capacity: usize) -> Self {
1154        let vec = vec![0; capacity];
1155        let data = vec.into_boxed_slice();
1156
1157        ZeroCopyBuffer {
1158            data: Pin::new(data),
1159            len: 0,
1160        }
1161    }
1162
1163    /// Get a slice of the buffer data
1164    pub fn as_slice(&self) -> &[u8] {
1165        &self.data[..self.len]
1166    }
1167
1168    /// Get a mutable slice of the entire buffer for reading into
1169    pub fn as_mut_slice(&mut self) -> &mut [u8] {
1170        &mut self.data[..]
1171    }
1172
1173    /// Get the buffer capacity
1174    pub fn capacity(&self) -> usize {
1175        self.data.len()
1176    }
1177
1178    /// Get the current length of data in the buffer
1179    pub fn len(&self) -> usize {
1180        self.len
1181    }
1182
1183    /// Check if the buffer is empty
1184    pub fn is_empty(&self) -> bool {
1185        self.len == 0
1186    }
1187
1188    /// Clear the buffer
1189    pub fn clear(&mut self) {
1190        self.len = 0;
1191    }
1192
1193    /// Reset the buffer (alias for clear)
1194    pub fn reset(&mut self) {
1195        self.clear();
1196    }
1197
1198    /// Set the length of valid data in the buffer
1199    pub fn set_len(&mut self, len: usize) {
1200        assert!(len <= self.capacity());
1201        self.len = len;
1202    }
1203
1204    /// Write data to the buffer
1205    pub fn write(&mut self, data: &[u8]) -> Result<usize, std::io::Error> {
1206        let available = self.capacity() - self.len;
1207        let to_write = data.len().min(available);
1208
1209        if to_write == 0 {
1210            return Err(std::io::Error::new(
1211                std::io::ErrorKind::WriteZero,
1212                "Buffer is full",
1213            ));
1214        }
1215
1216        // SAFETY: We're writing within bounds
1217        unsafe {
1218            let dst = self.data.as_mut_ptr().add(self.len);
1219            std::ptr::copy_nonoverlapping(data.as_ptr(), dst, to_write);
1220        }
1221
1222        self.len += to_write;
1223        Ok(to_write)
1224    }
1225}
1226
1227/// SIMD JSON processor for fast JSON parsing
1228#[derive(Clone)]
1229pub struct SimdJsonProcessor;
1230
1231impl SimdJsonProcessor {
1232    /// Create a new SIMD JSON processor
1233    pub fn new() -> Self {
1234        SimdJsonProcessor
1235    }
1236
1237    /// Parse JSON bytes into a Value
1238    pub fn parse<'a>(
1239        &mut self,
1240        json: &'a mut [u8],
1241    ) -> Result<simd_json::BorrowedValue<'a>, simd_json::Error> {
1242        simd_json::to_borrowed_value(json)
1243    }
1244
1245    /// Parse JSON string into a Value
1246    pub fn parse_str<'a>(
1247        &mut self,
1248        json: &'a mut str,
1249    ) -> Result<simd_json::BorrowedValue<'a>, simd_json::Error> {
1250        let bytes = unsafe { json.as_bytes_mut() };
1251        simd_json::to_borrowed_value(bytes)
1252    }
1253
1254    /// Parse JSON bytes into an owned Value
1255    pub fn parse_owned(
1256        &mut self,
1257        json: &mut [u8],
1258    ) -> Result<simd_json::OwnedValue, simd_json::Error> {
1259        simd_json::to_owned_value(json)
1260    }
1261
1262    /// Parse JSON bytes into a serde_json::Value (compatibility method)
1263    pub fn parse_json(&self, json: &[u8]) -> Result<serde_json::Value, serde_json::Error> {
1264        serde_json::from_slice(json)
1265    }
1266}
1267
1268impl Default for SimdJsonProcessor {
1269    fn default() -> Self {
1270        Self::new()
1271    }
1272}