Skip to main content

oxirs_core/
graph.rs

1//! RDF graph abstraction and operations
2
3use crate::concurrent::{BatchConfig, BatchOperation, ParallelBatchProcessor};
4use crate::model::*;
5use crate::Result;
6#[cfg(feature = "parallel")]
7use rayon::prelude::*;
8use std::collections::BTreeSet;
9use std::sync::Arc;
10
11/// RDF graph representation
12///
13/// A graph is a collection of RDF triples. This implementation uses a BTreeSet
14/// for efficient storage and retrieval.
15#[derive(Debug, Clone, PartialEq, Eq)]
16pub struct Graph {
17    triples: BTreeSet<Triple>,
18}
19
20impl Graph {
21    /// Create a new empty graph
22    pub fn new() -> Self {
23        Graph {
24            triples: BTreeSet::new(),
25        }
26    }
27
28    /// Create a graph from an iterator of triples
29    pub fn from_triples<I>(triples: I) -> Self
30    where
31        I: IntoIterator<Item = Triple>,
32    {
33        Graph {
34            triples: triples.into_iter().collect(),
35        }
36    }
37
38    /// Add a triple to the graph
39    pub fn add_triple(&mut self, triple: Triple) -> bool {
40        self.triples.insert(triple)
41    }
42
43    /// Add a triple to the graph using string components
44    pub fn add_triple_str(&mut self, subject: &str, predicate: &str, object: &str) -> Result<bool> {
45        let subject_node = NamedNode::new(subject)?;
46        let predicate_node = NamedNode::new(predicate)?;
47        let object_literal = Literal::new(object);
48
49        let triple = Triple::new(subject_node, predicate_node, object_literal);
50        Ok(self.add_triple(triple))
51    }
52
53    /// Remove a triple from the graph
54    pub fn remove_triple(&mut self, triple: &Triple) -> bool {
55        self.triples.remove(triple)
56    }
57
58    /// Check if a triple exists in the graph
59    pub fn contains_triple(&self, triple: &Triple) -> bool {
60        self.triples.contains(triple)
61    }
62
63    /// Query triples matching the given pattern
64    ///
65    /// None values act as wildcards matching any term.
66    pub fn query_triples(
67        &self,
68        subject: Option<&Subject>,
69        predicate: Option<&Predicate>,
70        object: Option<&Object>,
71    ) -> Vec<Triple> {
72        self.triples
73            .iter()
74            .filter(|triple| triple.matches_pattern(subject, predicate, object))
75            .cloned()
76            .collect()
77    }
78
79    /// Get all triples as a vector
80    pub fn triples(&self) -> Vec<Triple> {
81        self.triples.iter().cloned().collect()
82    }
83
84    /// Iterate over all triples
85    pub fn iter_triples(&self) -> impl Iterator<Item = &Triple> {
86        self.triples.iter()
87    }
88
89    /// Alias for add_triple for compatibility
90    pub fn insert(&mut self, triple: Triple) -> bool {
91        self.add_triple(triple)
92    }
93
94    /// Iterate over all triples (alias for iter_triples)
95    pub fn iter(&self) -> impl Iterator<Item = &Triple> {
96        self.triples.iter()
97    }
98
99    /// Check if a triple exists in the graph (alias for contains_triple)
100    pub fn contains(&self, triple: &Triple) -> bool {
101        self.contains_triple(triple)
102    }
103
104    /// Get all subjects in the graph
105    pub fn subjects(&self) -> BTreeSet<Subject> {
106        self.triples.iter().map(|t| t.subject().clone()).collect()
107    }
108
109    /// Get all predicates in the graph
110    pub fn predicates(&self) -> BTreeSet<Predicate> {
111        self.triples.iter().map(|t| t.predicate().clone()).collect()
112    }
113
114    /// Get all objects in the graph
115    pub fn objects(&self) -> BTreeSet<Object> {
116        self.triples.iter().map(|t| t.object().clone()).collect()
117    }
118
119    /// Merge another graph into this one
120    pub fn merge(&mut self, other: &Graph) {
121        for triple in &other.triples {
122            self.triples.insert(triple.clone());
123        }
124    }
125
126    /// Create a new graph containing the union of this graph and another
127    pub fn union(&self, other: &Graph) -> Graph {
128        let mut result = self.clone();
129        result.merge(other);
130        result
131    }
132
133    /// Create a new graph containing the intersection of this graph and another
134    pub fn intersection(&self, other: &Graph) -> Graph {
135        let intersection_triples: BTreeSet<Triple> =
136            self.triples.intersection(&other.triples).cloned().collect();
137
138        Graph {
139            triples: intersection_triples,
140        }
141    }
142
143    /// Create a new graph containing triples in this graph but not in the other
144    pub fn difference(&self, other: &Graph) -> Graph {
145        let difference_triples: BTreeSet<Triple> =
146            self.triples.difference(&other.triples).cloned().collect();
147
148        Graph {
149            triples: difference_triples,
150        }
151    }
152
153    /// Clear all triples from the graph
154    pub fn clear(&mut self) {
155        self.triples.clear();
156    }
157
158    /// Get the number of triples in the graph
159    pub fn len(&self) -> usize {
160        self.triples.len()
161    }
162
163    /// Check if the graph is empty
164    pub fn is_empty(&self) -> bool {
165        self.triples.is_empty()
166    }
167
168    /// Check if the graph is isomorphic to another graph
169    ///
170    /// This is a simplified check that doesn't handle blank node isomorphism properly.
171    /// For proper blank node isomorphism, a more sophisticated algorithm would be needed.
172    pub fn is_isomorphic_to(&self, other: &Graph) -> bool {
173        // Simple implementation: check if both graphs have the same triples
174        // This doesn't handle blank node renaming properly
175        self.triples == other.triples
176    }
177
178    // Parallel batch processing methods
179
180    /// Insert triples in parallel batches
181    ///
182    /// This method uses parallel processing to insert a large collection of triples
183    /// efficiently. It automatically batches the triples and processes them across
184    /// multiple CPU cores.
185    #[cfg(feature = "parallel")]
186    pub fn par_insert_batch(&mut self, triples: Vec<Triple>) -> Result<usize> {
187        if triples.is_empty() {
188            return Ok(0);
189        }
190
191        let config = BatchConfig::auto();
192        let batch_size = config.batch_size;
193        let processor = ParallelBatchProcessor::new(config);
194
195        // Split triples into batches
196        let operations: Vec<_> = triples
197            .par_chunks(batch_size)
198            .map(|chunk| BatchOperation::insert(chunk.to_vec()))
199            .collect();
200
201        // Submit all operations
202        processor.submit_batch(operations)?;
203
204        // Process operations and collect results
205        let all_triples = Arc::new(parking_lot::Mutex::new(Vec::new()));
206
207        let all_triples_clone = all_triples.clone();
208        processor.process(move |op| -> Result<()> {
209            match op {
210                BatchOperation::Insert(batch_triples) => {
211                    all_triples_clone.lock().extend(batch_triples);
212                    Ok(())
213                }
214                _ => Ok(()),
215            }
216        })?;
217
218        // Now insert all triples into the graph
219        let mut inserted = 0;
220        for triple in all_triples.lock().drain(..) {
221            if self.triples.insert(triple) {
222                inserted += 1;
223            }
224        }
225
226        Ok(inserted)
227    }
228
229    /// Remove triples in parallel batches
230    ///
231    /// This method uses parallel processing to remove a large collection of triples
232    /// efficiently. It automatically batches the triples and processes them across
233    /// multiple CPU cores.
234    #[cfg(feature = "parallel")]
235    pub fn par_remove_batch(&mut self, triples: Vec<Triple>) -> Result<usize> {
236        if triples.is_empty() {
237            return Ok(0);
238        }
239
240        let config = BatchConfig::auto();
241        let batch_size = config.batch_size;
242        let processor = ParallelBatchProcessor::new(config);
243
244        // Split triples into batches
245        let operations: Vec<_> = triples
246            .par_chunks(batch_size)
247            .map(|chunk| BatchOperation::remove(chunk.to_vec()))
248            .collect();
249
250        // Submit all operations
251        processor.submit_batch(operations)?;
252
253        // Process operations and collect results
254        let triples_to_remove = Arc::new(parking_lot::Mutex::new(Vec::new()));
255
256        let triples_clone = triples_to_remove.clone();
257        processor.process(move |op| -> Result<()> {
258            match op {
259                BatchOperation::Remove(batch_triples) => {
260                    triples_clone.lock().extend(batch_triples);
261                    Ok(())
262                }
263                _ => Ok(()),
264            }
265        })?;
266
267        // Now remove all triples from the graph
268        let mut removed = 0;
269        for triple in triples_to_remove.lock().drain(..) {
270            if self.triples.remove(&triple) {
271                removed += 1;
272            }
273        }
274
275        Ok(removed)
276    }
277
278    /// Query triples in parallel batches
279    ///
280    /// This method performs multiple queries in parallel, returning all matching triples.
281    /// Each query pattern is processed concurrently for improved performance.
282    #[cfg(feature = "parallel")]
283    pub fn par_query_batch(
284        &self,
285        queries: Vec<(Option<Subject>, Option<Predicate>, Option<Object>)>,
286    ) -> Result<Vec<Vec<Triple>>> {
287        if queries.is_empty() {
288            return Ok(vec![]);
289        }
290
291        let config = BatchConfig::auto();
292        let processor = ParallelBatchProcessor::new(config);
293
294        // Convert queries to operations
295        let operations: Vec<_> = queries
296            .into_iter()
297            .map(|(s, p, o)| BatchOperation::query(s, p, o))
298            .collect();
299
300        // Submit all operations
301        processor.submit_batch(operations)?;
302
303        // Clone the triples for processing
304        let triples = self.triples.clone();
305
306        let results = processor.process(move |op| -> Result<Vec<Triple>> {
307            match op {
308                BatchOperation::Query {
309                    subject,
310                    predicate,
311                    object,
312                } => {
313                    let matching: Vec<Triple> = triples
314                        .iter()
315                        .filter(|triple| {
316                            triple.matches_pattern(
317                                subject.as_ref(),
318                                predicate.as_ref(),
319                                object.as_ref(),
320                            )
321                        })
322                        .cloned()
323                        .collect();
324                    Ok(matching)
325                }
326                _ => Ok(vec![]),
327            }
328        })?;
329
330        Ok(results)
331    }
332
333    /// Apply a transformation function to all triples in parallel
334    ///
335    /// This method applies a transformation function to each triple in the graph
336    /// in parallel. The function can return None to remove a triple or Some(triple)
337    /// to replace it.
338    #[cfg(feature = "parallel")]
339    pub fn par_transform<F>(&mut self, transform_fn: F) -> Result<(usize, usize)>
340    where
341        F: Fn(&Triple) -> Option<Triple> + Send + Sync + 'static,
342    {
343        let triples: Vec<Triple> = self.triples.iter().cloned().collect();
344        if triples.is_empty() {
345            return Ok((0, 0));
346        }
347
348        let transform_fn = Arc::new(transform_fn);
349
350        // Process triples in parallel
351        let results: Vec<(Option<Triple>, Triple)> = triples
352            .par_iter()
353            .map(|triple| {
354                let result = transform_fn(triple);
355                (result, triple.clone())
356            })
357            .collect();
358
359        // Apply transformations
360        let mut transformed = 0;
361        let mut removed = 0;
362
363        for (new_triple, old_triple) in results {
364            match new_triple {
365                Some(new) if new != old_triple => {
366                    self.triples.remove(&old_triple);
367                    self.triples.insert(new);
368                    transformed += 1;
369                }
370                None => {
371                    self.triples.remove(&old_triple);
372                    removed += 1;
373                }
374                _ => {} // No change
375            }
376        }
377
378        Ok((transformed, removed))
379    }
380
381    /// Create a parallel iterator over the graph's triples
382    ///
383    /// This allows for parallel processing of triples using rayon's parallel iterator traits.
384    #[cfg(feature = "parallel")]
385    pub fn par_iter(&self) -> impl ParallelIterator<Item = &Triple> {
386        self.triples.par_iter()
387    }
388
389    /// Count triples matching patterns in parallel
390    ///
391    /// This method counts the number of triples matching each pattern in parallel.
392    #[cfg(feature = "parallel")]
393    pub fn par_count_patterns(
394        &self,
395        patterns: Vec<(Option<Subject>, Option<Predicate>, Option<Object>)>,
396    ) -> Vec<usize> {
397        patterns
398            .par_iter()
399            .map(|(subject, predicate, object)| {
400                self.triples
401                    .iter()
402                    .filter(|triple| {
403                        triple.matches_pattern(
404                            subject.as_ref(),
405                            predicate.as_ref(),
406                            object.as_ref(),
407                        )
408                    })
409                    .count()
410            })
411            .collect()
412    }
413
414    /// Find unique values for a given position in parallel
415    ///
416    /// This method finds all unique subjects, predicates, or objects in parallel.
417    #[cfg(feature = "parallel")]
418    pub fn par_unique_terms(&self) -> (BTreeSet<Subject>, BTreeSet<Predicate>, BTreeSet<Object>) {
419        let terms: Vec<(Subject, Predicate, Object)> = self
420            .triples
421            .par_iter()
422            .map(|triple| {
423                (
424                    triple.subject().clone(),
425                    triple.predicate().clone(),
426                    triple.object().clone(),
427                )
428            })
429            .collect();
430
431        let mut subjects = BTreeSet::new();
432        let mut predicates = BTreeSet::new();
433        let mut objects = BTreeSet::new();
434
435        for (s, p, o) in terms {
436            subjects.insert(s);
437            predicates.insert(p);
438            objects.insert(o);
439        }
440
441        (subjects, predicates, objects)
442    }
443}
444
445impl Default for Graph {
446    fn default() -> Self {
447        Self::new()
448    }
449}
450
451/// Iterator over triples in a graph
452pub struct GraphIter<'a> {
453    inner: std::collections::btree_set::Iter<'a, Triple>,
454}
455
456impl<'a> Iterator for GraphIter<'a> {
457    type Item = &'a Triple;
458
459    fn next(&mut self) -> Option<Self::Item> {
460        self.inner.next()
461    }
462}
463
464impl<'a> IntoIterator for &'a Graph {
465    type Item = &'a Triple;
466    type IntoIter = GraphIter<'a>;
467
468    fn into_iter(self) -> Self::IntoIter {
469        GraphIter {
470            inner: self.triples.iter(),
471        }
472    }
473}
474
475impl IntoIterator for Graph {
476    type Item = Triple;
477    type IntoIter = std::collections::btree_set::IntoIter<Triple>;
478
479    fn into_iter(self) -> Self::IntoIter {
480        self.triples.into_iter()
481    }
482}
483
484impl FromIterator<Triple> for Graph {
485    fn from_iter<I: IntoIterator<Item = Triple>>(iter: I) -> Self {
486        Graph {
487            triples: iter.into_iter().collect(),
488        }
489    }
490}
491
492impl Extend<Triple> for Graph {
493    fn extend<I: IntoIterator<Item = Triple>>(&mut self, iter: I) {
494        self.triples.extend(iter);
495    }
496}
497
498#[cfg(test)]
499mod tests {
500    use super::*;
501    use std::time::Instant;
502
503    fn create_test_triple(id: usize) -> Triple {
504        Triple::new(
505            Subject::NamedNode(
506                NamedNode::new(format!("http://subject/{id}")).expect("valid IRI from format"),
507            ),
508            Predicate::NamedNode(
509                NamedNode::new(format!("http://predicate/{id}")).expect("valid IRI from format"),
510            ),
511            Object::NamedNode(
512                NamedNode::new(format!("http://object/{id}")).expect("valid IRI from format"),
513            ),
514        )
515    }
516
517    fn create_test_triples(count: usize) -> Vec<Triple> {
518        (0..count).map(create_test_triple).collect()
519    }
520
521    #[test]
522    fn test_par_insert_batch() {
523        let mut graph = Graph::new();
524        let triples = create_test_triples(10000);
525
526        let start = Instant::now();
527        let inserted = graph
528            .par_insert_batch(triples.clone())
529            .expect("parallel batch insert should succeed");
530        let duration = start.elapsed();
531
532        println!("Parallel insert of 10000 triples took: {duration:?}");
533        assert_eq!(inserted, 10000);
534        assert_eq!(graph.len(), 10000);
535
536        // Verify all triples are present
537        for triple in &triples {
538            assert!(graph.contains_triple(triple));
539        }
540    }
541
542    #[test]
543    fn test_par_insert_batch_with_duplicates() {
544        let mut graph = Graph::new();
545        let mut triples = create_test_triples(5000);
546        // Add duplicates
547        triples.extend(create_test_triples(2500));
548
549        let inserted = graph
550            .par_insert_batch(triples)
551            .expect("graph operation should succeed");
552
553        // Should only insert unique triples
554        assert_eq!(inserted, 5000);
555        assert_eq!(graph.len(), 5000);
556    }
557
558    #[test]
559    fn test_par_remove_batch() {
560        let mut graph = Graph::new();
561        let triples = create_test_triples(10000);
562        graph.extend(triples.clone());
563
564        // Remove half of them
565        let to_remove: Vec<Triple> = triples.iter().step_by(2).cloned().collect();
566
567        let start = Instant::now();
568        let removed = graph
569            .par_remove_batch(to_remove.clone())
570            .expect("parallel batch remove should succeed");
571        let duration = start.elapsed();
572
573        println!("Parallel remove of 5000 triples took: {duration:?}");
574        assert_eq!(removed, 5000);
575        assert_eq!(graph.len(), 5000);
576
577        // Verify correct triples were removed
578        for (i, triple) in triples.iter().enumerate() {
579            if i % 2 == 0 {
580                assert!(!graph.contains_triple(triple));
581            } else {
582                assert!(graph.contains_triple(triple));
583            }
584        }
585    }
586
587    #[test]
588    fn test_par_query_batch() {
589        let mut graph = Graph::new();
590        let triples = create_test_triples(1000);
591        graph.extend(triples);
592
593        // Create multiple query patterns
594        let queries: Vec<_> = (0..100)
595            .map(|i| {
596                (
597                    Some(Subject::NamedNode(
598                        NamedNode::new(format!("http://subject/{i}"))
599                            .expect("valid IRI from format"),
600                    )),
601                    None,
602                    None,
603                )
604            })
605            .collect();
606
607        let start = Instant::now();
608        let results = graph
609            .par_query_batch(queries)
610            .expect("graph operation should succeed");
611        let duration = start.elapsed();
612
613        println!("Parallel query of 100 patterns took: {duration:?}");
614        assert_eq!(results.len(), 100);
615
616        // Each query should match exactly one triple
617        for (i, result) in results.iter().enumerate() {
618            if i < 1000 {
619                assert_eq!(result.len(), 1);
620            } else {
621                assert_eq!(result.len(), 0);
622            }
623        }
624    }
625
626    #[test]
627    fn test_par_transform() {
628        let mut graph = Graph::new();
629        let triples = create_test_triples(1000);
630        graph.extend(triples);
631
632        // Transform function: change predicate for even subjects
633        let transform_fn = |triple: &Triple| -> Option<Triple> {
634            if let Subject::NamedNode(node) = triple.subject() {
635                let uri = node.as_str();
636                if let Some(id_str) = uri.strip_prefix("http://subject/") {
637                    if let Ok(id) = id_str.parse::<usize>() {
638                        if id % 2 == 0 {
639                            // Transform: change predicate
640                            return Some(Triple::new(
641                                triple.subject().clone(),
642                                Predicate::NamedNode(
643                                    NamedNode::new("http://predicate/transformed")
644                                        .expect("valid IRI"),
645                                ),
646                                triple.object().clone(),
647                            ));
648                        } else if id % 3 == 0 {
649                            // Remove
650                            return None;
651                        }
652                    }
653                }
654            }
655            Some(triple.clone())
656        };
657
658        let start = Instant::now();
659        let (transformed, removed) = graph
660            .par_transform(transform_fn)
661            .expect("graph operation should succeed");
662        let duration = start.elapsed();
663
664        println!("Parallel transform took: {duration:?}");
665        println!("Transformed: {transformed}, Removed: {removed}");
666
667        // Verify transformations
668        let transformed_predicate = Predicate::NamedNode(
669            NamedNode::new("http://predicate/transformed").expect("valid IRI"),
670        );
671        let transformed_count = graph
672            .query_triples(None, Some(&transformed_predicate), None)
673            .len();
674        assert!(transformed_count > 0);
675    }
676
677    #[test]
678    fn test_par_count_patterns() {
679        let mut graph = Graph::new();
680
681        // Create triples with different patterns
682        for i in 0..100 {
683            for j in 0..10 {
684                let triple = Triple::new(
685                    Subject::NamedNode(
686                        NamedNode::new(format!("http://subject/{i}"))
687                            .expect("valid IRI from format"),
688                    ),
689                    Predicate::NamedNode(
690                        NamedNode::new(format!("http://predicate/{j}"))
691                            .expect("valid IRI from format"),
692                    ),
693                    Object::NamedNode(
694                        NamedNode::new(format!("http://object/{}", i * 10 + j))
695                            .expect("valid IRI from format"),
696                    ),
697                );
698                graph.add_triple(triple);
699            }
700        }
701
702        // Count patterns
703        let patterns: Vec<_> = (0..10)
704            .map(|i| {
705                (
706                    None,
707                    Some(Predicate::NamedNode(
708                        NamedNode::new(format!("http://predicate/{i}"))
709                            .expect("valid IRI from format"),
710                    )),
711                    None,
712                )
713            })
714            .collect();
715
716        let counts = graph.par_count_patterns(patterns);
717
718        // Each predicate should appear 100 times
719        for count in counts {
720            assert_eq!(count, 100);
721        }
722    }
723
724    #[test]
725    fn test_par_unique_terms() {
726        let mut graph = Graph::new();
727        let triples = create_test_triples(1000);
728        graph.extend(triples);
729
730        let start = Instant::now();
731        let (subjects, predicates, objects) = graph.par_unique_terms();
732        let duration = start.elapsed();
733
734        println!("Parallel unique terms extraction took: {duration:?}");
735
736        assert_eq!(subjects.len(), 1000);
737        assert_eq!(predicates.len(), 1000);
738        assert_eq!(objects.len(), 1000);
739    }
740
741    #[test]
742    fn test_par_iter() {
743        let mut graph = Graph::new();
744        let triples = create_test_triples(1000);
745        graph.extend(triples);
746
747        // Count triples using parallel iterator
748        let count = graph.par_iter().count();
749        assert_eq!(count, 1000);
750
751        // Filter using parallel iterator
752        let filtered: Vec<_> = graph
753            .par_iter()
754            .filter(|triple| {
755                if let Subject::NamedNode(node) = triple.subject() {
756                    node.as_str().ends_with("0")
757                } else {
758                    false
759                }
760            })
761            .cloned()
762            .collect();
763
764        assert_eq!(filtered.len(), 100);
765    }
766
767    #[test]
768    fn test_parallel_performance_comparison() {
769        let triple_count = 50000;
770        let triples = create_test_triples(triple_count);
771
772        // Sequential insert
773        let mut graph1 = Graph::new();
774        let start = Instant::now();
775        for triple in &triples {
776            graph1.add_triple(triple.clone());
777        }
778        let seq_duration = start.elapsed();
779
780        // Parallel insert
781        let mut graph2 = Graph::new();
782        let start = Instant::now();
783        graph2
784            .par_insert_batch(triples.clone())
785            .expect("parallel batch insert should succeed");
786        let par_duration = start.elapsed();
787
788        println!("Performance comparison for {triple_count} triples:");
789        println!("  Sequential insert: {seq_duration:?}");
790        println!("  Parallel insert: {par_duration:?}");
791        println!(
792            "  Speedup: {:.2}x",
793            seq_duration.as_secs_f64() / par_duration.as_secs_f64()
794        );
795
796        assert_eq!(graph1.len(), graph2.len());
797    }
798
799    #[test]
800    fn test_empty_operations() {
801        let mut graph = Graph::new();
802
803        // Test empty insert
804        let inserted = graph
805            .par_insert_batch(vec![])
806            .expect("graph operation should succeed");
807        assert_eq!(inserted, 0);
808
809        // Test empty remove
810        let removed = graph
811            .par_remove_batch(vec![])
812            .expect("graph operation should succeed");
813        assert_eq!(removed, 0);
814
815        // Test empty query
816        let results = graph
817            .par_query_batch(vec![])
818            .expect("graph operation should succeed");
819        assert!(results.is_empty());
820
821        // Test empty transform
822        let (transformed, removed) = graph
823            .par_transform(|t| Some(t.clone()))
824            .expect("parallel transform should succeed");
825        assert_eq!(transformed, 0);
826        assert_eq!(removed, 0);
827    }
828}
829
830/// Thread-safe concurrent graph for multi-threaded access
831///
832/// This struct wraps the Graph in an Arc<RwLock<_>> to provide safe concurrent
833/// access across multiple threads. It implements reader-writer semantics where
834/// multiple readers can access the graph simultaneously, but only one writer
835/// can modify it at a time.
836#[derive(Debug, Clone)]
837pub struct ConcurrentGraph {
838    inner: Arc<parking_lot::RwLock<Graph>>,
839}
840
841impl ConcurrentGraph {
842    /// Create a new empty concurrent graph
843    pub fn new() -> Self {
844        Self {
845            inner: Arc::new(parking_lot::RwLock::new(Graph::new())),
846        }
847    }
848
849    /// Create a concurrent graph from an existing graph
850    pub fn from_graph(graph: Graph) -> Self {
851        Self {
852            inner: Arc::new(parking_lot::RwLock::new(graph)),
853        }
854    }
855
856    /// Add a triple to the graph (thread-safe)
857    pub fn add_triple(&self, triple: Triple) -> bool {
858        self.inner.write().add_triple(triple)
859    }
860
861    /// Add multiple triples atomically
862    pub fn add_triples(&self, triples: Vec<Triple>) -> usize {
863        let mut graph = self.inner.write();
864        let mut added = 0;
865        for triple in triples {
866            if graph.add_triple(triple) {
867                added += 1;
868            }
869        }
870        added
871    }
872
873    /// Remove a triple from the graph (thread-safe)
874    pub fn remove_triple(&self, triple: &Triple) -> bool {
875        self.inner.write().remove_triple(triple)
876    }
877
878    /// Check if a triple exists in the graph (thread-safe read)
879    pub fn contains_triple(&self, triple: &Triple) -> bool {
880        self.inner.read().contains_triple(triple)
881    }
882
883    /// Query triples matching the given pattern (thread-safe read)
884    pub fn query_triples(
885        &self,
886        subject: Option<&Subject>,
887        predicate: Option<&Predicate>,
888        object: Option<&Object>,
889    ) -> Vec<Triple> {
890        self.inner.read().query_triples(subject, predicate, object)
891    }
892
893    /// Get the number of triples (thread-safe read)
894    pub fn len(&self) -> usize {
895        self.inner.read().len()
896    }
897
898    /// Check if the graph is empty (thread-safe read)
899    pub fn is_empty(&self) -> bool {
900        self.inner.read().is_empty()
901    }
902
903    /// Get all triples as a vector (thread-safe read)
904    pub fn triples(&self) -> Vec<Triple> {
905        self.inner.read().triples()
906    }
907
908    /// Merge another graph into this one (thread-safe)
909    pub fn merge(&self, other: &Graph) {
910        self.inner.write().merge(other)
911    }
912
913    /// Merge another concurrent graph into this one (thread-safe)
914    pub fn merge_concurrent(&self, other: &ConcurrentGraph) {
915        let other_triples = other.triples();
916        let mut graph = self.inner.write();
917        for triple in other_triples {
918            graph.add_triple(triple);
919        }
920    }
921
922    /// Create a union with another graph (thread-safe read)
923    pub fn union(&self, other: &Graph) -> Graph {
924        self.inner.read().union(other)
925    }
926
927    /// Create an intersection with another graph (thread-safe read)
928    pub fn intersection(&self, other: &Graph) -> Graph {
929        self.inner.read().intersection(other)
930    }
931
932    /// Clear all triples from the graph (thread-safe)
933    pub fn clear(&self) {
934        self.inner.write().clear()
935    }
936
937    /// Execute a read operation with access to the underlying graph
938    pub fn with_read<F, R>(&self, f: F) -> R
939    where
940        F: FnOnce(&Graph) -> R,
941    {
942        let graph = self.inner.read();
943        f(&graph)
944    }
945
946    /// Execute a write operation with access to the underlying graph
947    pub fn with_write<F, R>(&self, f: F) -> R
948    where
949        F: FnOnce(&mut Graph) -> R,
950    {
951        let mut graph = self.inner.write();
952        f(&mut graph)
953    }
954
955    /// Parallel batch insert (thread-safe)
956    #[cfg(feature = "parallel")]
957    pub fn par_insert_batch(&self, triples: Vec<Triple>) -> Result<usize> {
958        self.inner.write().par_insert_batch(triples)
959    }
960
961    /// Parallel batch remove (thread-safe)
962    #[cfg(feature = "parallel")]
963    pub fn par_remove_batch(&self, triples: Vec<Triple>) -> Result<usize> {
964        self.inner.write().par_remove_batch(triples)
965    }
966
967    /// Parallel batch query (thread-safe read)
968    #[cfg(feature = "parallel")]
969    pub fn par_query_batch(
970        &self,
971        queries: Vec<(Option<Subject>, Option<Predicate>, Option<Object>)>,
972    ) -> Result<Vec<Vec<Triple>>> {
973        self.inner.read().par_query_batch(queries)
974    }
975
976    /// Get subjects concurrently
977    pub fn subjects(&self) -> BTreeSet<Subject> {
978        self.inner.read().subjects()
979    }
980
981    /// Get predicates concurrently
982    pub fn predicates(&self) -> BTreeSet<Predicate> {
983        self.inner.read().predicates()
984    }
985
986    /// Get objects concurrently
987    pub fn objects(&self) -> BTreeSet<Object> {
988        self.inner.read().objects()
989    }
990}
991
992impl Default for ConcurrentGraph {
993    fn default() -> Self {
994        Self::new()
995    }
996}
997
998/// Thread pool for concurrent graph operations
999#[allow(dead_code)]
1000pub struct GraphThreadPool {
1001    #[cfg(feature = "parallel")]
1002    pool: rayon::ThreadPool,
1003    max_batch_size: usize,
1004}
1005
1006impl GraphThreadPool {
1007    /// Create a new graph thread pool
1008    pub fn new() -> Result<Self> {
1009        #[cfg(feature = "parallel")]
1010        {
1011            let pool = rayon::ThreadPoolBuilder::new()
1012                .num_threads(num_cpus::get())
1013                .thread_name(|index| format!("oxirs-graph-{index}"))
1014                .build()
1015                .map_err(|e| crate::OxirsError::ConcurrencyError(e.to_string()))?;
1016
1017            Ok(Self {
1018                pool,
1019                max_batch_size: 10_000,
1020            })
1021        }
1022        #[cfg(not(feature = "parallel"))]
1023        {
1024            Ok(Self {
1025                max_batch_size: 10_000,
1026            })
1027        }
1028    }
1029
1030    /// Create a thread pool with custom configuration
1031    pub fn with_config(num_threads: usize, max_batch_size: usize) -> Result<Self> {
1032        #[cfg(feature = "parallel")]
1033        {
1034            let pool = rayon::ThreadPoolBuilder::new()
1035                .num_threads(num_threads)
1036                .thread_name(|index| format!("oxirs-graph-{index}"))
1037                .build()
1038                .map_err(|e| crate::OxirsError::ConcurrencyError(e.to_string()))?;
1039
1040            Ok(Self {
1041                pool,
1042                max_batch_size,
1043            })
1044        }
1045        #[cfg(not(feature = "parallel"))]
1046        {
1047            Ok(Self { max_batch_size })
1048        }
1049    }
1050
1051    /// Process triples concurrently
1052    pub fn process_triples<F, R>(&self, triples: Vec<Triple>, processor: F) -> Vec<R>
1053    where
1054        F: Fn(Triple) -> R + Sync + Send,
1055        R: Send,
1056    {
1057        #[cfg(feature = "parallel")]
1058        {
1059            self.pool
1060                .install(|| triples.into_par_iter().map(processor).collect())
1061        }
1062        #[cfg(not(feature = "parallel"))]
1063        {
1064            triples.into_iter().map(processor).collect()
1065        }
1066    }
1067
1068    /// Process graph operations concurrently
1069    pub fn process_graphs<F, R>(&self, graphs: Vec<Graph>, processor: F) -> Vec<R>
1070    where
1071        F: Fn(Graph) -> R + Sync + Send,
1072        R: Send,
1073    {
1074        #[cfg(feature = "parallel")]
1075        {
1076            self.pool
1077                .install(|| graphs.into_par_iter().map(processor).collect())
1078        }
1079        #[cfg(not(feature = "parallel"))]
1080        {
1081            graphs.into_iter().map(processor).collect()
1082        }
1083    }
1084
1085    /// Parallel merge multiple graphs
1086    pub fn merge_graphs(&self, graphs: Vec<Graph>) -> Graph {
1087        if graphs.is_empty() {
1088            return Graph::new();
1089        }
1090
1091        #[cfg(feature = "parallel")]
1092        {
1093            self.pool.install(|| {
1094                graphs.into_par_iter().reduce(Graph::new, |mut acc, graph| {
1095                    acc.merge(&graph);
1096                    acc
1097                })
1098            })
1099        }
1100        #[cfg(not(feature = "parallel"))]
1101        {
1102            graphs.into_iter().fold(Graph::new(), |mut acc, graph| {
1103                acc.merge(&graph);
1104                acc
1105            })
1106        }
1107    }
1108
1109    /// Get the underlying thread pool (only available with parallel feature)
1110    #[cfg(feature = "parallel")]
1111    pub fn inner(&self) -> &rayon::ThreadPool {
1112        &self.pool
1113    }
1114}
1115
1116impl Default for GraphThreadPool {
1117    fn default() -> Self {
1118        Self::new().expect("Failed to create default thread pool")
1119    }
1120}
1121
1122#[cfg(test)]
1123mod concurrent_tests {
1124    use super::*;
1125    use std::sync::atomic::{AtomicUsize, Ordering};
1126    use std::thread;
1127    use std::time::Duration;
1128
1129    #[test]
1130    fn test_concurrent_graph_basic_operations() {
1131        let graph = ConcurrentGraph::new();
1132
1133        let triple = Triple::new(
1134            NamedNode::new("http://example.org/s").expect("valid IRI"),
1135            NamedNode::new("http://example.org/p").expect("valid IRI"),
1136            Literal::new("test"),
1137        );
1138
1139        // Test basic operations
1140        assert!(graph.add_triple(triple.clone()));
1141        assert!(graph.contains_triple(&triple));
1142        assert_eq!(graph.len(), 1);
1143        assert!(!graph.is_empty());
1144
1145        // Test removal
1146        assert!(graph.remove_triple(&triple));
1147        assert!(!graph.contains_triple(&triple));
1148        assert_eq!(graph.len(), 0);
1149        assert!(graph.is_empty());
1150    }
1151
1152    #[test]
1153    fn test_concurrent_access() {
1154        let graph = ConcurrentGraph::new();
1155
1156        let counter = Arc::new(AtomicUsize::new(0));
1157
1158        // Spawn multiple reader threads
1159        let mut handles = vec![];
1160
1161        for i in 0..10 {
1162            let g = graph.clone();
1163            let c = counter.clone();
1164
1165            handles.push(thread::spawn(move || {
1166                for j in 0..100 {
1167                    let triple = Triple::new(
1168                        NamedNode::new(format!("http://example.org/s{}", i * 100 + j))
1169                            .expect("valid IRI from format"),
1170                        NamedNode::new("http://example.org/p").expect("valid IRI"),
1171                        Literal::new(format!("value{j}")),
1172                    );
1173
1174                    if g.add_triple(triple) {
1175                        c.fetch_add(1, Ordering::Relaxed);
1176                    }
1177
1178                    // Small delay to encourage interleaving
1179                    thread::sleep(Duration::from_nanos(1));
1180                }
1181            }));
1182        }
1183
1184        // Wait for all threads to complete
1185        for handle in handles {
1186            handle.join().expect("thread should not panic");
1187        }
1188
1189        // Verify results
1190        assert_eq!(counter.load(Ordering::Relaxed), 1000);
1191        assert_eq!(graph.len(), 1000);
1192    }
1193
1194    #[test]
1195    fn test_concurrent_graph_merge() {
1196        let graph1 = ConcurrentGraph::new();
1197        let graph2 = ConcurrentGraph::new();
1198
1199        // Add different triples to each graph
1200        for i in 0..100 {
1201            let triple1 = Triple::new(
1202                NamedNode::new(format!("http://example.org/s1_{i}"))
1203                    .expect("valid IRI from format"),
1204                NamedNode::new("http://example.org/p").expect("valid IRI"),
1205                Literal::new(format!("value{i}")),
1206            );
1207            graph1.add_triple(triple1);
1208
1209            let triple2 = Triple::new(
1210                NamedNode::new(format!("http://example.org/s2_{i}"))
1211                    .expect("valid IRI from format"),
1212                NamedNode::new("http://example.org/p").expect("valid IRI"),
1213                Literal::new(format!("value{i}")),
1214            );
1215            graph2.add_triple(triple2);
1216        }
1217
1218        // Merge graphs
1219        graph1.merge_concurrent(&graph2);
1220
1221        assert_eq!(graph1.len(), 200);
1222        assert_eq!(graph2.len(), 100);
1223    }
1224
1225    #[test]
1226    fn test_graph_thread_pool() {
1227        let pool = GraphThreadPool::new().expect("thread pool creation should succeed");
1228
1229        // Create test triples
1230        let triples: Vec<Triple> = (0..1000)
1231            .map(|i| {
1232                Triple::new(
1233                    NamedNode::new(format!("http://example.org/s{i}"))
1234                        .expect("valid IRI from format"),
1235                    NamedNode::new("http://example.org/p").expect("valid IRI"),
1236                    Literal::new(format!("value{i}")),
1237                )
1238            })
1239            .collect();
1240
1241        // Process triples concurrently
1242        let results = pool.process_triples(triples.clone(), |triple| {
1243            // Simulate some processing
1244            triple.to_string().len()
1245        });
1246
1247        assert_eq!(results.len(), 1000);
1248        assert!(results.iter().all(|&len| len > 0));
1249    }
1250
1251    #[test]
1252    fn test_concurrent_with_operations() {
1253        let graph = ConcurrentGraph::new();
1254
1255        // Test with_read
1256        let initial_len = graph.with_read(|g| g.len());
1257        assert_eq!(initial_len, 0);
1258
1259        // Test with_write
1260        graph.with_write(|g| {
1261            for i in 0..10 {
1262                let triple = Triple::new(
1263                    NamedNode::new(format!("http://example.org/s{i}"))
1264                        .expect("valid IRI from format"),
1265                    NamedNode::new("http://example.org/p").expect("valid IRI"),
1266                    Literal::new(format!("value{i}")),
1267                );
1268                g.add_triple(triple);
1269            }
1270        });
1271
1272        let final_len = graph.with_read(|g| g.len());
1273        assert_eq!(final_len, 10);
1274    }
1275}