1use crate::concurrent::{BatchConfig, BatchOperation, ParallelBatchProcessor};
4use crate::model::*;
5use crate::Result;
6#[cfg(feature = "parallel")]
7use rayon::prelude::*;
8use std::collections::BTreeSet;
9use std::sync::Arc;
10
11#[derive(Debug, Clone, PartialEq, Eq)]
16pub struct Graph {
17 triples: BTreeSet<Triple>,
18}
19
20impl Graph {
21 pub fn new() -> Self {
23 Graph {
24 triples: BTreeSet::new(),
25 }
26 }
27
28 pub fn from_triples<I>(triples: I) -> Self
30 where
31 I: IntoIterator<Item = Triple>,
32 {
33 Graph {
34 triples: triples.into_iter().collect(),
35 }
36 }
37
38 pub fn add_triple(&mut self, triple: Triple) -> bool {
40 self.triples.insert(triple)
41 }
42
43 pub fn add_triple_str(&mut self, subject: &str, predicate: &str, object: &str) -> Result<bool> {
45 let subject_node = NamedNode::new(subject)?;
46 let predicate_node = NamedNode::new(predicate)?;
47 let object_literal = Literal::new(object);
48
49 let triple = Triple::new(subject_node, predicate_node, object_literal);
50 Ok(self.add_triple(triple))
51 }
52
53 pub fn remove_triple(&mut self, triple: &Triple) -> bool {
55 self.triples.remove(triple)
56 }
57
58 pub fn contains_triple(&self, triple: &Triple) -> bool {
60 self.triples.contains(triple)
61 }
62
63 pub fn query_triples(
67 &self,
68 subject: Option<&Subject>,
69 predicate: Option<&Predicate>,
70 object: Option<&Object>,
71 ) -> Vec<Triple> {
72 self.triples
73 .iter()
74 .filter(|triple| triple.matches_pattern(subject, predicate, object))
75 .cloned()
76 .collect()
77 }
78
79 pub fn triples(&self) -> Vec<Triple> {
81 self.triples.iter().cloned().collect()
82 }
83
84 pub fn iter_triples(&self) -> impl Iterator<Item = &Triple> {
86 self.triples.iter()
87 }
88
89 pub fn insert(&mut self, triple: Triple) -> bool {
91 self.add_triple(triple)
92 }
93
94 pub fn iter(&self) -> impl Iterator<Item = &Triple> {
96 self.triples.iter()
97 }
98
99 pub fn contains(&self, triple: &Triple) -> bool {
101 self.contains_triple(triple)
102 }
103
104 pub fn subjects(&self) -> BTreeSet<Subject> {
106 self.triples.iter().map(|t| t.subject().clone()).collect()
107 }
108
109 pub fn predicates(&self) -> BTreeSet<Predicate> {
111 self.triples.iter().map(|t| t.predicate().clone()).collect()
112 }
113
114 pub fn objects(&self) -> BTreeSet<Object> {
116 self.triples.iter().map(|t| t.object().clone()).collect()
117 }
118
119 pub fn merge(&mut self, other: &Graph) {
121 for triple in &other.triples {
122 self.triples.insert(triple.clone());
123 }
124 }
125
126 pub fn union(&self, other: &Graph) -> Graph {
128 let mut result = self.clone();
129 result.merge(other);
130 result
131 }
132
133 pub fn intersection(&self, other: &Graph) -> Graph {
135 let intersection_triples: BTreeSet<Triple> =
136 self.triples.intersection(&other.triples).cloned().collect();
137
138 Graph {
139 triples: intersection_triples,
140 }
141 }
142
143 pub fn difference(&self, other: &Graph) -> Graph {
145 let difference_triples: BTreeSet<Triple> =
146 self.triples.difference(&other.triples).cloned().collect();
147
148 Graph {
149 triples: difference_triples,
150 }
151 }
152
153 pub fn clear(&mut self) {
155 self.triples.clear();
156 }
157
158 pub fn len(&self) -> usize {
160 self.triples.len()
161 }
162
163 pub fn is_empty(&self) -> bool {
165 self.triples.is_empty()
166 }
167
168 pub fn is_isomorphic_to(&self, other: &Graph) -> bool {
173 self.triples == other.triples
176 }
177
178 #[cfg(feature = "parallel")]
186 pub fn par_insert_batch(&mut self, triples: Vec<Triple>) -> Result<usize> {
187 if triples.is_empty() {
188 return Ok(0);
189 }
190
191 let config = BatchConfig::auto();
192 let batch_size = config.batch_size;
193 let processor = ParallelBatchProcessor::new(config);
194
195 let operations: Vec<_> = triples
197 .par_chunks(batch_size)
198 .map(|chunk| BatchOperation::insert(chunk.to_vec()))
199 .collect();
200
201 processor.submit_batch(operations)?;
203
204 let all_triples = Arc::new(parking_lot::Mutex::new(Vec::new()));
206
207 let all_triples_clone = all_triples.clone();
208 processor.process(move |op| -> Result<()> {
209 match op {
210 BatchOperation::Insert(batch_triples) => {
211 all_triples_clone.lock().extend(batch_triples);
212 Ok(())
213 }
214 _ => Ok(()),
215 }
216 })?;
217
218 let mut inserted = 0;
220 for triple in all_triples.lock().drain(..) {
221 if self.triples.insert(triple) {
222 inserted += 1;
223 }
224 }
225
226 Ok(inserted)
227 }
228
229 #[cfg(feature = "parallel")]
235 pub fn par_remove_batch(&mut self, triples: Vec<Triple>) -> Result<usize> {
236 if triples.is_empty() {
237 return Ok(0);
238 }
239
240 let config = BatchConfig::auto();
241 let batch_size = config.batch_size;
242 let processor = ParallelBatchProcessor::new(config);
243
244 let operations: Vec<_> = triples
246 .par_chunks(batch_size)
247 .map(|chunk| BatchOperation::remove(chunk.to_vec()))
248 .collect();
249
250 processor.submit_batch(operations)?;
252
253 let triples_to_remove = Arc::new(parking_lot::Mutex::new(Vec::new()));
255
256 let triples_clone = triples_to_remove.clone();
257 processor.process(move |op| -> Result<()> {
258 match op {
259 BatchOperation::Remove(batch_triples) => {
260 triples_clone.lock().extend(batch_triples);
261 Ok(())
262 }
263 _ => Ok(()),
264 }
265 })?;
266
267 let mut removed = 0;
269 for triple in triples_to_remove.lock().drain(..) {
270 if self.triples.remove(&triple) {
271 removed += 1;
272 }
273 }
274
275 Ok(removed)
276 }
277
278 #[cfg(feature = "parallel")]
283 pub fn par_query_batch(
284 &self,
285 queries: Vec<(Option<Subject>, Option<Predicate>, Option<Object>)>,
286 ) -> Result<Vec<Vec<Triple>>> {
287 if queries.is_empty() {
288 return Ok(vec![]);
289 }
290
291 let config = BatchConfig::auto();
292 let processor = ParallelBatchProcessor::new(config);
293
294 let operations: Vec<_> = queries
296 .into_iter()
297 .map(|(s, p, o)| BatchOperation::query(s, p, o))
298 .collect();
299
300 processor.submit_batch(operations)?;
302
303 let triples = self.triples.clone();
305
306 let results = processor.process(move |op| -> Result<Vec<Triple>> {
307 match op {
308 BatchOperation::Query {
309 subject,
310 predicate,
311 object,
312 } => {
313 let matching: Vec<Triple> = triples
314 .iter()
315 .filter(|triple| {
316 triple.matches_pattern(
317 subject.as_ref(),
318 predicate.as_ref(),
319 object.as_ref(),
320 )
321 })
322 .cloned()
323 .collect();
324 Ok(matching)
325 }
326 _ => Ok(vec![]),
327 }
328 })?;
329
330 Ok(results)
331 }
332
333 #[cfg(feature = "parallel")]
339 pub fn par_transform<F>(&mut self, transform_fn: F) -> Result<(usize, usize)>
340 where
341 F: Fn(&Triple) -> Option<Triple> + Send + Sync + 'static,
342 {
343 let triples: Vec<Triple> = self.triples.iter().cloned().collect();
344 if triples.is_empty() {
345 return Ok((0, 0));
346 }
347
348 let transform_fn = Arc::new(transform_fn);
349
350 let results: Vec<(Option<Triple>, Triple)> = triples
352 .par_iter()
353 .map(|triple| {
354 let result = transform_fn(triple);
355 (result, triple.clone())
356 })
357 .collect();
358
359 let mut transformed = 0;
361 let mut removed = 0;
362
363 for (new_triple, old_triple) in results {
364 match new_triple {
365 Some(new) if new != old_triple => {
366 self.triples.remove(&old_triple);
367 self.triples.insert(new);
368 transformed += 1;
369 }
370 None => {
371 self.triples.remove(&old_triple);
372 removed += 1;
373 }
374 _ => {} }
376 }
377
378 Ok((transformed, removed))
379 }
380
381 #[cfg(feature = "parallel")]
385 pub fn par_iter(&self) -> impl ParallelIterator<Item = &Triple> {
386 self.triples.par_iter()
387 }
388
389 #[cfg(feature = "parallel")]
393 pub fn par_count_patterns(
394 &self,
395 patterns: Vec<(Option<Subject>, Option<Predicate>, Option<Object>)>,
396 ) -> Vec<usize> {
397 patterns
398 .par_iter()
399 .map(|(subject, predicate, object)| {
400 self.triples
401 .iter()
402 .filter(|triple| {
403 triple.matches_pattern(
404 subject.as_ref(),
405 predicate.as_ref(),
406 object.as_ref(),
407 )
408 })
409 .count()
410 })
411 .collect()
412 }
413
414 #[cfg(feature = "parallel")]
418 pub fn par_unique_terms(&self) -> (BTreeSet<Subject>, BTreeSet<Predicate>, BTreeSet<Object>) {
419 let terms: Vec<(Subject, Predicate, Object)> = self
420 .triples
421 .par_iter()
422 .map(|triple| {
423 (
424 triple.subject().clone(),
425 triple.predicate().clone(),
426 triple.object().clone(),
427 )
428 })
429 .collect();
430
431 let mut subjects = BTreeSet::new();
432 let mut predicates = BTreeSet::new();
433 let mut objects = BTreeSet::new();
434
435 for (s, p, o) in terms {
436 subjects.insert(s);
437 predicates.insert(p);
438 objects.insert(o);
439 }
440
441 (subjects, predicates, objects)
442 }
443}
444
445impl Default for Graph {
446 fn default() -> Self {
447 Self::new()
448 }
449}
450
451pub struct GraphIter<'a> {
453 inner: std::collections::btree_set::Iter<'a, Triple>,
454}
455
456impl<'a> Iterator for GraphIter<'a> {
457 type Item = &'a Triple;
458
459 fn next(&mut self) -> Option<Self::Item> {
460 self.inner.next()
461 }
462}
463
464impl<'a> IntoIterator for &'a Graph {
465 type Item = &'a Triple;
466 type IntoIter = GraphIter<'a>;
467
468 fn into_iter(self) -> Self::IntoIter {
469 GraphIter {
470 inner: self.triples.iter(),
471 }
472 }
473}
474
475impl IntoIterator for Graph {
476 type Item = Triple;
477 type IntoIter = std::collections::btree_set::IntoIter<Triple>;
478
479 fn into_iter(self) -> Self::IntoIter {
480 self.triples.into_iter()
481 }
482}
483
484impl FromIterator<Triple> for Graph {
485 fn from_iter<I: IntoIterator<Item = Triple>>(iter: I) -> Self {
486 Graph {
487 triples: iter.into_iter().collect(),
488 }
489 }
490}
491
492impl Extend<Triple> for Graph {
493 fn extend<I: IntoIterator<Item = Triple>>(&mut self, iter: I) {
494 self.triples.extend(iter);
495 }
496}
497
498#[cfg(test)]
499mod tests {
500 use super::*;
501 use std::time::Instant;
502
503 fn create_test_triple(id: usize) -> Triple {
504 Triple::new(
505 Subject::NamedNode(
506 NamedNode::new(format!("http://subject/{id}")).expect("valid IRI from format"),
507 ),
508 Predicate::NamedNode(
509 NamedNode::new(format!("http://predicate/{id}")).expect("valid IRI from format"),
510 ),
511 Object::NamedNode(
512 NamedNode::new(format!("http://object/{id}")).expect("valid IRI from format"),
513 ),
514 )
515 }
516
517 fn create_test_triples(count: usize) -> Vec<Triple> {
518 (0..count).map(create_test_triple).collect()
519 }
520
521 #[test]
522 fn test_par_insert_batch() {
523 let mut graph = Graph::new();
524 let triples = create_test_triples(10000);
525
526 let start = Instant::now();
527 let inserted = graph
528 .par_insert_batch(triples.clone())
529 .expect("parallel batch insert should succeed");
530 let duration = start.elapsed();
531
532 println!("Parallel insert of 10000 triples took: {duration:?}");
533 assert_eq!(inserted, 10000);
534 assert_eq!(graph.len(), 10000);
535
536 for triple in &triples {
538 assert!(graph.contains_triple(triple));
539 }
540 }
541
542 #[test]
543 fn test_par_insert_batch_with_duplicates() {
544 let mut graph = Graph::new();
545 let mut triples = create_test_triples(5000);
546 triples.extend(create_test_triples(2500));
548
549 let inserted = graph
550 .par_insert_batch(triples)
551 .expect("graph operation should succeed");
552
553 assert_eq!(inserted, 5000);
555 assert_eq!(graph.len(), 5000);
556 }
557
558 #[test]
559 fn test_par_remove_batch() {
560 let mut graph = Graph::new();
561 let triples = create_test_triples(10000);
562 graph.extend(triples.clone());
563
564 let to_remove: Vec<Triple> = triples.iter().step_by(2).cloned().collect();
566
567 let start = Instant::now();
568 let removed = graph
569 .par_remove_batch(to_remove.clone())
570 .expect("parallel batch remove should succeed");
571 let duration = start.elapsed();
572
573 println!("Parallel remove of 5000 triples took: {duration:?}");
574 assert_eq!(removed, 5000);
575 assert_eq!(graph.len(), 5000);
576
577 for (i, triple) in triples.iter().enumerate() {
579 if i % 2 == 0 {
580 assert!(!graph.contains_triple(triple));
581 } else {
582 assert!(graph.contains_triple(triple));
583 }
584 }
585 }
586
587 #[test]
588 fn test_par_query_batch() {
589 let mut graph = Graph::new();
590 let triples = create_test_triples(1000);
591 graph.extend(triples);
592
593 let queries: Vec<_> = (0..100)
595 .map(|i| {
596 (
597 Some(Subject::NamedNode(
598 NamedNode::new(format!("http://subject/{i}"))
599 .expect("valid IRI from format"),
600 )),
601 None,
602 None,
603 )
604 })
605 .collect();
606
607 let start = Instant::now();
608 let results = graph
609 .par_query_batch(queries)
610 .expect("graph operation should succeed");
611 let duration = start.elapsed();
612
613 println!("Parallel query of 100 patterns took: {duration:?}");
614 assert_eq!(results.len(), 100);
615
616 for (i, result) in results.iter().enumerate() {
618 if i < 1000 {
619 assert_eq!(result.len(), 1);
620 } else {
621 assert_eq!(result.len(), 0);
622 }
623 }
624 }
625
626 #[test]
627 fn test_par_transform() {
628 let mut graph = Graph::new();
629 let triples = create_test_triples(1000);
630 graph.extend(triples);
631
632 let transform_fn = |triple: &Triple| -> Option<Triple> {
634 if let Subject::NamedNode(node) = triple.subject() {
635 let uri = node.as_str();
636 if let Some(id_str) = uri.strip_prefix("http://subject/") {
637 if let Ok(id) = id_str.parse::<usize>() {
638 if id % 2 == 0 {
639 return Some(Triple::new(
641 triple.subject().clone(),
642 Predicate::NamedNode(
643 NamedNode::new("http://predicate/transformed")
644 .expect("valid IRI"),
645 ),
646 triple.object().clone(),
647 ));
648 } else if id % 3 == 0 {
649 return None;
651 }
652 }
653 }
654 }
655 Some(triple.clone())
656 };
657
658 let start = Instant::now();
659 let (transformed, removed) = graph
660 .par_transform(transform_fn)
661 .expect("graph operation should succeed");
662 let duration = start.elapsed();
663
664 println!("Parallel transform took: {duration:?}");
665 println!("Transformed: {transformed}, Removed: {removed}");
666
667 let transformed_predicate = Predicate::NamedNode(
669 NamedNode::new("http://predicate/transformed").expect("valid IRI"),
670 );
671 let transformed_count = graph
672 .query_triples(None, Some(&transformed_predicate), None)
673 .len();
674 assert!(transformed_count > 0);
675 }
676
677 #[test]
678 fn test_par_count_patterns() {
679 let mut graph = Graph::new();
680
681 for i in 0..100 {
683 for j in 0..10 {
684 let triple = Triple::new(
685 Subject::NamedNode(
686 NamedNode::new(format!("http://subject/{i}"))
687 .expect("valid IRI from format"),
688 ),
689 Predicate::NamedNode(
690 NamedNode::new(format!("http://predicate/{j}"))
691 .expect("valid IRI from format"),
692 ),
693 Object::NamedNode(
694 NamedNode::new(format!("http://object/{}", i * 10 + j))
695 .expect("valid IRI from format"),
696 ),
697 );
698 graph.add_triple(triple);
699 }
700 }
701
702 let patterns: Vec<_> = (0..10)
704 .map(|i| {
705 (
706 None,
707 Some(Predicate::NamedNode(
708 NamedNode::new(format!("http://predicate/{i}"))
709 .expect("valid IRI from format"),
710 )),
711 None,
712 )
713 })
714 .collect();
715
716 let counts = graph.par_count_patterns(patterns);
717
718 for count in counts {
720 assert_eq!(count, 100);
721 }
722 }
723
724 #[test]
725 fn test_par_unique_terms() {
726 let mut graph = Graph::new();
727 let triples = create_test_triples(1000);
728 graph.extend(triples);
729
730 let start = Instant::now();
731 let (subjects, predicates, objects) = graph.par_unique_terms();
732 let duration = start.elapsed();
733
734 println!("Parallel unique terms extraction took: {duration:?}");
735
736 assert_eq!(subjects.len(), 1000);
737 assert_eq!(predicates.len(), 1000);
738 assert_eq!(objects.len(), 1000);
739 }
740
741 #[test]
742 fn test_par_iter() {
743 let mut graph = Graph::new();
744 let triples = create_test_triples(1000);
745 graph.extend(triples);
746
747 let count = graph.par_iter().count();
749 assert_eq!(count, 1000);
750
751 let filtered: Vec<_> = graph
753 .par_iter()
754 .filter(|triple| {
755 if let Subject::NamedNode(node) = triple.subject() {
756 node.as_str().ends_with("0")
757 } else {
758 false
759 }
760 })
761 .cloned()
762 .collect();
763
764 assert_eq!(filtered.len(), 100);
765 }
766
767 #[test]
768 fn test_parallel_performance_comparison() {
769 let triple_count = 50000;
770 let triples = create_test_triples(triple_count);
771
772 let mut graph1 = Graph::new();
774 let start = Instant::now();
775 for triple in &triples {
776 graph1.add_triple(triple.clone());
777 }
778 let seq_duration = start.elapsed();
779
780 let mut graph2 = Graph::new();
782 let start = Instant::now();
783 graph2
784 .par_insert_batch(triples.clone())
785 .expect("parallel batch insert should succeed");
786 let par_duration = start.elapsed();
787
788 println!("Performance comparison for {triple_count} triples:");
789 println!(" Sequential insert: {seq_duration:?}");
790 println!(" Parallel insert: {par_duration:?}");
791 println!(
792 " Speedup: {:.2}x",
793 seq_duration.as_secs_f64() / par_duration.as_secs_f64()
794 );
795
796 assert_eq!(graph1.len(), graph2.len());
797 }
798
799 #[test]
800 fn test_empty_operations() {
801 let mut graph = Graph::new();
802
803 let inserted = graph
805 .par_insert_batch(vec![])
806 .expect("graph operation should succeed");
807 assert_eq!(inserted, 0);
808
809 let removed = graph
811 .par_remove_batch(vec![])
812 .expect("graph operation should succeed");
813 assert_eq!(removed, 0);
814
815 let results = graph
817 .par_query_batch(vec![])
818 .expect("graph operation should succeed");
819 assert!(results.is_empty());
820
821 let (transformed, removed) = graph
823 .par_transform(|t| Some(t.clone()))
824 .expect("parallel transform should succeed");
825 assert_eq!(transformed, 0);
826 assert_eq!(removed, 0);
827 }
828}
829
830#[derive(Debug, Clone)]
837pub struct ConcurrentGraph {
838 inner: Arc<parking_lot::RwLock<Graph>>,
839}
840
841impl ConcurrentGraph {
842 pub fn new() -> Self {
844 Self {
845 inner: Arc::new(parking_lot::RwLock::new(Graph::new())),
846 }
847 }
848
849 pub fn from_graph(graph: Graph) -> Self {
851 Self {
852 inner: Arc::new(parking_lot::RwLock::new(graph)),
853 }
854 }
855
856 pub fn add_triple(&self, triple: Triple) -> bool {
858 self.inner.write().add_triple(triple)
859 }
860
861 pub fn add_triples(&self, triples: Vec<Triple>) -> usize {
863 let mut graph = self.inner.write();
864 let mut added = 0;
865 for triple in triples {
866 if graph.add_triple(triple) {
867 added += 1;
868 }
869 }
870 added
871 }
872
873 pub fn remove_triple(&self, triple: &Triple) -> bool {
875 self.inner.write().remove_triple(triple)
876 }
877
878 pub fn contains_triple(&self, triple: &Triple) -> bool {
880 self.inner.read().contains_triple(triple)
881 }
882
883 pub fn query_triples(
885 &self,
886 subject: Option<&Subject>,
887 predicate: Option<&Predicate>,
888 object: Option<&Object>,
889 ) -> Vec<Triple> {
890 self.inner.read().query_triples(subject, predicate, object)
891 }
892
893 pub fn len(&self) -> usize {
895 self.inner.read().len()
896 }
897
898 pub fn is_empty(&self) -> bool {
900 self.inner.read().is_empty()
901 }
902
903 pub fn triples(&self) -> Vec<Triple> {
905 self.inner.read().triples()
906 }
907
908 pub fn merge(&self, other: &Graph) {
910 self.inner.write().merge(other)
911 }
912
913 pub fn merge_concurrent(&self, other: &ConcurrentGraph) {
915 let other_triples = other.triples();
916 let mut graph = self.inner.write();
917 for triple in other_triples {
918 graph.add_triple(triple);
919 }
920 }
921
922 pub fn union(&self, other: &Graph) -> Graph {
924 self.inner.read().union(other)
925 }
926
927 pub fn intersection(&self, other: &Graph) -> Graph {
929 self.inner.read().intersection(other)
930 }
931
932 pub fn clear(&self) {
934 self.inner.write().clear()
935 }
936
937 pub fn with_read<F, R>(&self, f: F) -> R
939 where
940 F: FnOnce(&Graph) -> R,
941 {
942 let graph = self.inner.read();
943 f(&graph)
944 }
945
946 pub fn with_write<F, R>(&self, f: F) -> R
948 where
949 F: FnOnce(&mut Graph) -> R,
950 {
951 let mut graph = self.inner.write();
952 f(&mut graph)
953 }
954
955 #[cfg(feature = "parallel")]
957 pub fn par_insert_batch(&self, triples: Vec<Triple>) -> Result<usize> {
958 self.inner.write().par_insert_batch(triples)
959 }
960
961 #[cfg(feature = "parallel")]
963 pub fn par_remove_batch(&self, triples: Vec<Triple>) -> Result<usize> {
964 self.inner.write().par_remove_batch(triples)
965 }
966
967 #[cfg(feature = "parallel")]
969 pub fn par_query_batch(
970 &self,
971 queries: Vec<(Option<Subject>, Option<Predicate>, Option<Object>)>,
972 ) -> Result<Vec<Vec<Triple>>> {
973 self.inner.read().par_query_batch(queries)
974 }
975
976 pub fn subjects(&self) -> BTreeSet<Subject> {
978 self.inner.read().subjects()
979 }
980
981 pub fn predicates(&self) -> BTreeSet<Predicate> {
983 self.inner.read().predicates()
984 }
985
986 pub fn objects(&self) -> BTreeSet<Object> {
988 self.inner.read().objects()
989 }
990}
991
992impl Default for ConcurrentGraph {
993 fn default() -> Self {
994 Self::new()
995 }
996}
997
998#[allow(dead_code)]
1000pub struct GraphThreadPool {
1001 #[cfg(feature = "parallel")]
1002 pool: rayon::ThreadPool,
1003 max_batch_size: usize,
1004}
1005
1006impl GraphThreadPool {
1007 pub fn new() -> Result<Self> {
1009 #[cfg(feature = "parallel")]
1010 {
1011 let pool = rayon::ThreadPoolBuilder::new()
1012 .num_threads(num_cpus::get())
1013 .thread_name(|index| format!("oxirs-graph-{index}"))
1014 .build()
1015 .map_err(|e| crate::OxirsError::ConcurrencyError(e.to_string()))?;
1016
1017 Ok(Self {
1018 pool,
1019 max_batch_size: 10_000,
1020 })
1021 }
1022 #[cfg(not(feature = "parallel"))]
1023 {
1024 Ok(Self {
1025 max_batch_size: 10_000,
1026 })
1027 }
1028 }
1029
1030 pub fn with_config(num_threads: usize, max_batch_size: usize) -> Result<Self> {
1032 #[cfg(feature = "parallel")]
1033 {
1034 let pool = rayon::ThreadPoolBuilder::new()
1035 .num_threads(num_threads)
1036 .thread_name(|index| format!("oxirs-graph-{index}"))
1037 .build()
1038 .map_err(|e| crate::OxirsError::ConcurrencyError(e.to_string()))?;
1039
1040 Ok(Self {
1041 pool,
1042 max_batch_size,
1043 })
1044 }
1045 #[cfg(not(feature = "parallel"))]
1046 {
1047 Ok(Self { max_batch_size })
1048 }
1049 }
1050
1051 pub fn process_triples<F, R>(&self, triples: Vec<Triple>, processor: F) -> Vec<R>
1053 where
1054 F: Fn(Triple) -> R + Sync + Send,
1055 R: Send,
1056 {
1057 #[cfg(feature = "parallel")]
1058 {
1059 self.pool
1060 .install(|| triples.into_par_iter().map(processor).collect())
1061 }
1062 #[cfg(not(feature = "parallel"))]
1063 {
1064 triples.into_iter().map(processor).collect()
1065 }
1066 }
1067
1068 pub fn process_graphs<F, R>(&self, graphs: Vec<Graph>, processor: F) -> Vec<R>
1070 where
1071 F: Fn(Graph) -> R + Sync + Send,
1072 R: Send,
1073 {
1074 #[cfg(feature = "parallel")]
1075 {
1076 self.pool
1077 .install(|| graphs.into_par_iter().map(processor).collect())
1078 }
1079 #[cfg(not(feature = "parallel"))]
1080 {
1081 graphs.into_iter().map(processor).collect()
1082 }
1083 }
1084
1085 pub fn merge_graphs(&self, graphs: Vec<Graph>) -> Graph {
1087 if graphs.is_empty() {
1088 return Graph::new();
1089 }
1090
1091 #[cfg(feature = "parallel")]
1092 {
1093 self.pool.install(|| {
1094 graphs.into_par_iter().reduce(Graph::new, |mut acc, graph| {
1095 acc.merge(&graph);
1096 acc
1097 })
1098 })
1099 }
1100 #[cfg(not(feature = "parallel"))]
1101 {
1102 graphs.into_iter().fold(Graph::new(), |mut acc, graph| {
1103 acc.merge(&graph);
1104 acc
1105 })
1106 }
1107 }
1108
1109 #[cfg(feature = "parallel")]
1111 pub fn inner(&self) -> &rayon::ThreadPool {
1112 &self.pool
1113 }
1114}
1115
1116impl Default for GraphThreadPool {
1117 fn default() -> Self {
1118 Self::new().expect("Failed to create default thread pool")
1119 }
1120}
1121
1122#[cfg(test)]
1123mod concurrent_tests {
1124 use super::*;
1125 use std::sync::atomic::{AtomicUsize, Ordering};
1126 use std::thread;
1127 use std::time::Duration;
1128
1129 #[test]
1130 fn test_concurrent_graph_basic_operations() {
1131 let graph = ConcurrentGraph::new();
1132
1133 let triple = Triple::new(
1134 NamedNode::new("http://example.org/s").expect("valid IRI"),
1135 NamedNode::new("http://example.org/p").expect("valid IRI"),
1136 Literal::new("test"),
1137 );
1138
1139 assert!(graph.add_triple(triple.clone()));
1141 assert!(graph.contains_triple(&triple));
1142 assert_eq!(graph.len(), 1);
1143 assert!(!graph.is_empty());
1144
1145 assert!(graph.remove_triple(&triple));
1147 assert!(!graph.contains_triple(&triple));
1148 assert_eq!(graph.len(), 0);
1149 assert!(graph.is_empty());
1150 }
1151
1152 #[test]
1153 fn test_concurrent_access() {
1154 let graph = ConcurrentGraph::new();
1155
1156 let counter = Arc::new(AtomicUsize::new(0));
1157
1158 let mut handles = vec![];
1160
1161 for i in 0..10 {
1162 let g = graph.clone();
1163 let c = counter.clone();
1164
1165 handles.push(thread::spawn(move || {
1166 for j in 0..100 {
1167 let triple = Triple::new(
1168 NamedNode::new(format!("http://example.org/s{}", i * 100 + j))
1169 .expect("valid IRI from format"),
1170 NamedNode::new("http://example.org/p").expect("valid IRI"),
1171 Literal::new(format!("value{j}")),
1172 );
1173
1174 if g.add_triple(triple) {
1175 c.fetch_add(1, Ordering::Relaxed);
1176 }
1177
1178 thread::sleep(Duration::from_nanos(1));
1180 }
1181 }));
1182 }
1183
1184 for handle in handles {
1186 handle.join().expect("thread should not panic");
1187 }
1188
1189 assert_eq!(counter.load(Ordering::Relaxed), 1000);
1191 assert_eq!(graph.len(), 1000);
1192 }
1193
1194 #[test]
1195 fn test_concurrent_graph_merge() {
1196 let graph1 = ConcurrentGraph::new();
1197 let graph2 = ConcurrentGraph::new();
1198
1199 for i in 0..100 {
1201 let triple1 = Triple::new(
1202 NamedNode::new(format!("http://example.org/s1_{i}"))
1203 .expect("valid IRI from format"),
1204 NamedNode::new("http://example.org/p").expect("valid IRI"),
1205 Literal::new(format!("value{i}")),
1206 );
1207 graph1.add_triple(triple1);
1208
1209 let triple2 = Triple::new(
1210 NamedNode::new(format!("http://example.org/s2_{i}"))
1211 .expect("valid IRI from format"),
1212 NamedNode::new("http://example.org/p").expect("valid IRI"),
1213 Literal::new(format!("value{i}")),
1214 );
1215 graph2.add_triple(triple2);
1216 }
1217
1218 graph1.merge_concurrent(&graph2);
1220
1221 assert_eq!(graph1.len(), 200);
1222 assert_eq!(graph2.len(), 100);
1223 }
1224
1225 #[test]
1226 fn test_graph_thread_pool() {
1227 let pool = GraphThreadPool::new().expect("thread pool creation should succeed");
1228
1229 let triples: Vec<Triple> = (0..1000)
1231 .map(|i| {
1232 Triple::new(
1233 NamedNode::new(format!("http://example.org/s{i}"))
1234 .expect("valid IRI from format"),
1235 NamedNode::new("http://example.org/p").expect("valid IRI"),
1236 Literal::new(format!("value{i}")),
1237 )
1238 })
1239 .collect();
1240
1241 let results = pool.process_triples(triples.clone(), |triple| {
1243 triple.to_string().len()
1245 });
1246
1247 assert_eq!(results.len(), 1000);
1248 assert!(results.iter().all(|&len| len > 0));
1249 }
1250
1251 #[test]
1252 fn test_concurrent_with_operations() {
1253 let graph = ConcurrentGraph::new();
1254
1255 let initial_len = graph.with_read(|g| g.len());
1257 assert_eq!(initial_len, 0);
1258
1259 graph.with_write(|g| {
1261 for i in 0..10 {
1262 let triple = Triple::new(
1263 NamedNode::new(format!("http://example.org/s{i}"))
1264 .expect("valid IRI from format"),
1265 NamedNode::new("http://example.org/p").expect("valid IRI"),
1266 Literal::new(format!("value{i}")),
1267 );
1268 g.add_triple(triple);
1269 }
1270 });
1271
1272 let final_len = graph.with_read(|g| g.len());
1273 assert_eq!(final_len, 10);
1274 }
1275}