1use super::epoch::{EpochManager, HazardPointer};
7use crate::model::{Object, Predicate, Subject, Triple};
8use crate::OxirsError;
9use crossbeam_epoch::Owned;
10use dashmap::DashMap;
11use std::collections::HashSet;
12use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
13use std::sync::Arc;
14
15struct GraphNode {
18 triples: Arc<DashMap<u64, Triple>>,
20 version: AtomicU64,
22 spo_index: Arc<DashMap<Subject, DashMap<Predicate, HashSet<Object>>>>,
24 #[allow(dead_code)]
26 pos_index: Arc<DashMap<Predicate, DashMap<Object, HashSet<Subject>>>>,
27 osp_index: Arc<DashMap<Object, DashMap<Subject, HashSet<Predicate>>>>,
29}
30
31impl GraphNode {
32 fn new() -> Self {
33 Self {
34 triples: Arc::new(DashMap::new()),
35 version: AtomicU64::new(0),
36 spo_index: Arc::new(DashMap::new()),
37 pos_index: Arc::new(DashMap::new()),
38 osp_index: Arc::new(DashMap::new()),
39 }
40 }
41
42 fn increment_version(&self) -> u64 {
43 self.version.fetch_add(1, Ordering::Release)
44 }
45}
46
47pub struct ConcurrentGraph {
49 graph: Arc<HazardPointer<GraphNode>>,
51 epoch_manager: Arc<EpochManager>,
53 triple_count: Arc<AtomicUsize>,
55 operation_count: Arc<AtomicU64>,
57}
58
59impl ConcurrentGraph {
60 pub fn new() -> Self {
62 let graph_node = GraphNode::new();
63 Self {
64 graph: Arc::new(HazardPointer::new(graph_node)),
65 epoch_manager: Arc::new(EpochManager::new()),
66 triple_count: Arc::new(AtomicUsize::new(0)),
67 operation_count: Arc::new(AtomicU64::new(0)),
68 }
69 }
70
71 pub fn insert(&self, triple: Triple) -> Result<bool, OxirsError> {
73 let guard = self.epoch_manager.pin();
74 self.operation_count.fetch_add(1, Ordering::Relaxed);
75
76 let triple_id = self.hash_triple(&triple);
78
79 let current = self.graph.load(&guard);
81 let graph_node = unsafe {
82 current
83 .as_ref()
84 .ok_or_else(|| OxirsError::Store("Graph not initialized".to_string()))?
85 };
86
87 if graph_node.triples.contains_key(&triple_id) {
89 return Ok(false);
90 }
91
92 graph_node.triples.insert(triple_id, triple.clone());
94
95 self.update_indices_insert(graph_node, &triple);
97
98 graph_node.increment_version();
100
101 self.triple_count.fetch_add(1, Ordering::Release);
103
104 Ok(true)
105 }
106
107 pub fn remove(&self, triple: &Triple) -> Result<bool, OxirsError> {
109 let guard = self.epoch_manager.pin();
110 self.operation_count.fetch_add(1, Ordering::Relaxed);
111
112 let triple_id = self.hash_triple(triple);
113
114 let current = self.graph.load(&guard);
116 let graph_node = unsafe {
117 current
118 .as_ref()
119 .ok_or_else(|| OxirsError::Store("Graph not initialized".to_string()))?
120 };
121
122 if graph_node.triples.remove(&triple_id).is_none() {
124 return Ok(false);
125 }
126
127 self.update_indices_remove(graph_node, triple);
129
130 graph_node.increment_version();
132
133 self.triple_count.fetch_sub(1, Ordering::Release);
135
136 Ok(true)
137 }
138
139 pub fn contains(&self, triple: &Triple) -> bool {
141 let guard = self.epoch_manager.pin();
142 let triple_id = self.hash_triple(triple);
143
144 if let Some(graph_node) = unsafe { self.graph.load(&guard).as_ref() } {
145 graph_node.triples.contains_key(&triple_id)
146 } else {
147 false
148 }
149 }
150
151 pub fn len(&self) -> usize {
153 self.triple_count.load(Ordering::Acquire)
154 }
155
156 pub fn is_empty(&self) -> bool {
158 self.len() == 0
159 }
160
161 pub fn iter(&self) -> impl Iterator<Item = Triple> + '_ {
163 let guard = self.epoch_manager.pin();
164 let snapshot = if let Some(graph_node) = unsafe { self.graph.load(&guard).as_ref() } {
165 graph_node
166 .triples
167 .iter()
168 .map(|entry| entry.value().clone())
169 .collect::<Vec<_>>()
170 } else {
171 Vec::new()
172 };
173
174 snapshot.into_iter()
175 }
176
177 pub fn match_pattern(
179 &self,
180 subject: Option<&Subject>,
181 predicate: Option<&Predicate>,
182 object: Option<&Object>,
183 ) -> Vec<Triple> {
184 let guard = self.epoch_manager.pin();
185 let graph_node = match unsafe { self.graph.load(&guard).as_ref() } {
186 Some(node) => node,
187 None => return Vec::new(),
188 };
189
190 match (subject, predicate, object) {
191 (Some(s), Some(p), Some(o)) => {
193 let triple = Triple::new(s.clone(), p.clone(), o.clone());
194 if self.contains(&triple) {
195 vec![triple]
196 } else {
197 Vec::new()
198 }
199 }
200 (Some(s), Some(p), None) => match graph_node.spo_index.get(s) {
202 Some(pred_map) => match pred_map.get(p) {
203 Some(obj_set) => obj_set
204 .iter()
205 .map(|o| Triple::new(s.clone(), p.clone(), o.clone()))
206 .collect(),
207 _ => Vec::new(),
208 },
209 _ => Vec::new(),
210 },
211 (Some(s), None, None) => match graph_node.spo_index.get(s) {
213 Some(pred_map) => pred_map
214 .iter()
215 .flat_map(|pred_entry| {
216 let p = pred_entry.key().clone();
217 let s = s.clone();
218 pred_entry
219 .value()
220 .iter()
221 .map(move |o| Triple::new(s.clone(), p.clone(), o.clone()))
222 .collect::<Vec<_>>()
223 })
224 .collect(),
225 _ => Vec::new(),
226 },
227 (None, None, Some(o)) => match graph_node.osp_index.get(o) {
229 Some(subj_map) => subj_map
230 .iter()
231 .flat_map(|subj_entry| {
232 let s = subj_entry.key().clone();
233 let o = o.clone();
234 subj_entry
235 .value()
236 .iter()
237 .map(move |p| Triple::new(s.clone(), p.clone(), o.clone()))
238 .collect::<Vec<_>>()
239 })
240 .collect(),
241 _ => Vec::new(),
242 },
243 _ => graph_node
245 .triples
246 .iter()
247 .map(|entry| entry.value().clone())
248 .filter(|t| {
249 subject.map_or(true, |s| t.subject() == s)
250 && predicate.map_or(true, |p| t.predicate() == p)
251 && object.map_or(true, |o| t.object() == o)
252 })
253 .collect(),
254 }
255 }
256
257 pub fn stats(&self) -> GraphStats {
259 GraphStats {
260 triple_count: self.len(),
261 operation_count: self.operation_count.load(Ordering::Relaxed),
262 current_epoch: self.epoch_manager.current_epoch(),
263 }
264 }
265
266 pub fn collect(&self) {
268 let guard = self.epoch_manager.pin();
269 self.epoch_manager.flush(&guard);
270 self.epoch_manager.advance();
271 }
272
273 fn hash_triple(&self, triple: &Triple) -> u64 {
276 use std::hash::{Hash, Hasher};
277 let mut hasher = ahash::AHasher::default();
278 triple.subject().hash(&mut hasher);
279 triple.predicate().hash(&mut hasher);
280 triple.object().hash(&mut hasher);
281 hasher.finish()
282 }
283
284 fn update_indices_insert(&self, graph_node: &GraphNode, triple: &Triple) {
285 graph_node
287 .spo_index
288 .entry(triple.subject().clone())
289 .or_default()
290 .entry(triple.predicate().clone())
291 .or_default()
292 .insert(triple.object().clone());
293
294 graph_node
296 .osp_index
297 .entry(triple.object().clone())
298 .or_default()
299 .entry(triple.subject().clone())
300 .or_default()
301 .insert(triple.predicate().clone());
302 }
303
304 fn update_indices_remove(&self, graph_node: &GraphNode, triple: &Triple) {
305 if let Some(pred_map) = graph_node.spo_index.get_mut(triple.subject()) {
307 if let Some(mut obj_set) = pred_map.get_mut(triple.predicate()) {
308 obj_set.remove(triple.object());
309 if obj_set.is_empty() {
310 drop(obj_set);
311 pred_map.remove(triple.predicate());
312 }
313 }
314 if pred_map.is_empty() {
315 drop(pred_map);
316 graph_node.spo_index.remove(triple.subject());
317 }
318 }
319
320 if let Some(subj_map) = graph_node.osp_index.get_mut(triple.object()) {
322 if let Some(mut pred_set) = subj_map.get_mut(triple.subject()) {
323 pred_set.remove(triple.predicate());
324 if pred_set.is_empty() {
325 drop(pred_set);
326 subj_map.remove(triple.subject());
327 }
328 }
329 if subj_map.is_empty() {
330 drop(subj_map);
331 graph_node.osp_index.remove(triple.object());
332 }
333 }
334 }
335}
336
337impl Default for ConcurrentGraph {
338 fn default() -> Self {
339 Self::new()
340 }
341}
342
343#[derive(Debug, Clone)]
345pub struct GraphStats {
346 pub triple_count: usize,
347 pub operation_count: u64,
348 pub current_epoch: usize,
349}
350
351impl ConcurrentGraph {
353 pub fn insert_batch(&self, triples: Vec<Triple>) -> Result<usize, OxirsError> {
358 if triples.len() < 100 {
360 let mut inserted = 0;
361 for triple in triples {
362 if self.insert(triple)? {
363 inserted += 1;
364 }
365 }
366 return Ok(inserted);
367 }
368
369 self.insert_batch_parallel(triples)
371 }
372
373 #[cfg(feature = "parallel")]
378 fn insert_batch_parallel(&self, triples: Vec<Triple>) -> Result<usize, OxirsError> {
379 use rayon::prelude::*;
380 use std::sync::atomic::AtomicUsize;
381
382 let inserted_count = AtomicUsize::new(0);
383 let errors: Arc<parking_lot::Mutex<Vec<OxirsError>>> =
384 Arc::new(parking_lot::Mutex::new(Vec::new()));
385
386 triples.par_iter().for_each(|triple| {
388 match self.insert(triple.clone()) {
389 Ok(true) => {
390 inserted_count.fetch_add(1, Ordering::Relaxed);
391 }
392 Ok(false) => {
393 }
395 Err(e) => {
396 errors.lock().push(e);
397 }
398 }
399 });
400
401 let error_vec = errors.lock();
403 if !error_vec.is_empty() {
404 return Err(OxirsError::Store(format!(
405 "Batch insert failed with {} errors",
406 error_vec.len()
407 )));
408 }
409
410 Ok(inserted_count.load(Ordering::Relaxed))
411 }
412
413 #[cfg(not(feature = "parallel"))]
415 fn insert_batch_parallel(&self, triples: Vec<Triple>) -> Result<usize, OxirsError> {
416 let mut inserted = 0;
417 for triple in triples {
418 if self.insert(triple)? {
419 inserted += 1;
420 }
421 }
422 Ok(inserted)
423 }
424
425 pub fn remove_batch(&self, triples: &[Triple]) -> Result<usize, OxirsError> {
430 if triples.len() < 100 {
432 let mut removed = 0;
433 for triple in triples {
434 if self.remove(triple)? {
435 removed += 1;
436 }
437 }
438 return Ok(removed);
439 }
440
441 self.remove_batch_parallel(triples)
443 }
444
445 #[cfg(feature = "parallel")]
447 fn remove_batch_parallel(&self, triples: &[Triple]) -> Result<usize, OxirsError> {
448 use rayon::prelude::*;
449 use std::sync::atomic::AtomicUsize;
450
451 let removed_count = AtomicUsize::new(0);
452 let errors: Arc<parking_lot::Mutex<Vec<OxirsError>>> =
453 Arc::new(parking_lot::Mutex::new(Vec::new()));
454
455 triples.par_iter().for_each(|triple| {
457 match self.remove(triple) {
458 Ok(true) => {
459 removed_count.fetch_add(1, Ordering::Relaxed);
460 }
461 Ok(false) => {
462 }
464 Err(e) => {
465 errors.lock().push(e);
466 }
467 }
468 });
469
470 let error_vec = errors.lock();
472 if !error_vec.is_empty() {
473 return Err(OxirsError::Store(format!(
474 "Batch remove failed with {} errors",
475 error_vec.len()
476 )));
477 }
478
479 Ok(removed_count.load(Ordering::Relaxed))
480 }
481
482 #[cfg(not(feature = "parallel"))]
484 fn remove_batch_parallel(&self, triples: &[Triple]) -> Result<usize, OxirsError> {
485 let mut removed = 0;
486 for triple in triples {
487 if self.remove(triple)? {
488 removed += 1;
489 }
490 }
491 Ok(removed)
492 }
493
494 pub fn rebuild_indices(&self) -> Result<(), OxirsError> {
499 let guard = self.epoch_manager.pin();
500
501 let current = self.graph.load(&guard);
503 let graph_node = unsafe {
504 current
505 .as_ref()
506 .ok_or_else(|| OxirsError::Store("Graph not initialized".to_string()))?
507 };
508
509 graph_node.spo_index.clear();
511 graph_node.pos_index.clear();
512 graph_node.osp_index.clear();
513
514 #[cfg(feature = "parallel")]
516 {
517 use rayon::prelude::*;
518
519 let triples: Vec<Triple> = graph_node
521 .triples
522 .iter()
523 .map(|entry| entry.value().clone())
524 .collect();
525
526 triples.par_iter().for_each(|triple| {
527 self.update_indices_insert(graph_node, triple);
528 });
529 }
530
531 #[cfg(not(feature = "parallel"))]
532 {
533 for entry in graph_node.triples.iter() {
534 let triple = entry.value();
535 self.update_indices_insert(graph_node, triple);
536 }
537 }
538
539 graph_node.increment_version();
541
542 Ok(())
543 }
544
545 pub fn clear(&self) -> Result<(), OxirsError> {
547 let guard = self.epoch_manager.pin();
548
549 let new_node = GraphNode::new();
551 self.graph.store(Owned::new(new_node), &guard);
552
553 self.triple_count.store(0, Ordering::Release);
555
556 self.collect();
558
559 Ok(())
560 }
561}
562
563#[cfg(test)]
564mod tests {
565 use super::*;
566 use crate::NamedNode;
567
568 fn create_test_triple(s: &str, p: &str, o: &str) -> Triple {
569 Triple::new(
570 Subject::NamedNode(NamedNode::new(s).expect("valid IRI")),
571 Predicate::NamedNode(NamedNode::new(p).expect("valid IRI")),
572 Object::NamedNode(NamedNode::new(o).expect("valid IRI")),
573 )
574 }
575
576 #[test]
577 fn test_concurrent_insert() {
578 let graph = ConcurrentGraph::new();
579 let triple = create_test_triple("http://s", "http://p", "http://o");
580
581 assert!(graph
582 .insert(triple.clone())
583 .expect("graph insert should succeed"));
584 assert!(!graph
585 .insert(triple.clone())
586 .expect("graph insert should succeed"));
587 assert_eq!(graph.len(), 1);
588 assert!(graph.contains(&triple));
589 }
590
591 #[test]
592 fn test_concurrent_remove() {
593 let graph = ConcurrentGraph::new();
594 let triple = create_test_triple("http://s", "http://p", "http://o");
595
596 assert!(graph
597 .insert(triple.clone())
598 .expect("graph insert should succeed"));
599 assert!(graph
600 .remove(&triple)
601 .expect("graph operation should succeed"));
602 assert!(!graph
603 .remove(&triple)
604 .expect("graph operation should succeed"));
605 assert_eq!(graph.len(), 0);
606 assert!(!graph.contains(&triple));
607 }
608
609 #[test]
610 fn test_pattern_matching() {
611 let graph = ConcurrentGraph::new();
612
613 let t1 = create_test_triple("http://s1", "http://p1", "http://o1");
615 let t2 = create_test_triple("http://s1", "http://p1", "http://o2");
616 let t3 = create_test_triple("http://s1", "http://p2", "http://o1");
617 let t4 = create_test_triple("http://s2", "http://p1", "http://o1");
618
619 graph
620 .insert(t1.clone())
621 .expect("graph insert should succeed");
622 graph
623 .insert(t2.clone())
624 .expect("graph insert should succeed");
625 graph
626 .insert(t3.clone())
627 .expect("graph insert should succeed");
628 graph
629 .insert(t4.clone())
630 .expect("graph insert should succeed");
631
632 let s1 = Subject::NamedNode(NamedNode::new("http://s1").expect("valid IRI"));
634 let matches = graph.match_pattern(Some(&s1), None, None);
635 assert_eq!(matches.len(), 3);
636
637 let p1 = Predicate::NamedNode(NamedNode::new("http://p1").expect("valid IRI"));
639 let matches = graph.match_pattern(Some(&s1), Some(&p1), None);
640 assert_eq!(matches.len(), 2);
641
642 let o1 = Object::NamedNode(NamedNode::new("http://o1").expect("valid IRI"));
644 let matches = graph.match_pattern(None, None, Some(&o1));
645 assert_eq!(matches.len(), 3);
646 }
647
648 #[test]
649 fn test_concurrent_operations() {
650 use std::thread;
651
652 let graph = Arc::new(ConcurrentGraph::new());
653 let num_threads = 4;
654 let ops_per_thread = 100;
655
656 let handles: Vec<_> = (0..num_threads)
657 .map(|i| {
658 let graph = graph.clone();
659 thread::spawn(move || {
660 for j in 0..ops_per_thread {
661 let triple = create_test_triple(
662 &format!("http://s{i}"),
663 &format!("http://p{j}"),
664 &format!("http://o{}", i * ops_per_thread + j),
665 );
666 graph
667 .insert(triple)
668 .expect("graph operation should succeed");
669 }
670 })
671 })
672 .collect();
673
674 for handle in handles {
675 handle.join().expect("thread should not panic");
676 }
677
678 assert_eq!(graph.len(), num_threads * ops_per_thread);
679 }
680
681 #[test]
682 fn test_batch_operations() {
683 let graph = ConcurrentGraph::new();
684
685 let triples: Vec<_> = (0..10)
686 .map(|i| create_test_triple(&format!("http://s{i}"), "http://p", "http://o"))
687 .collect();
688
689 let inserted = graph
690 .insert_batch(triples.clone())
691 .expect("batch insert should succeed");
692 assert_eq!(inserted, 10);
693 assert_eq!(graph.len(), 10);
694
695 let removed = graph
696 .remove_batch(&triples[0..5])
697 .expect("graph operation should succeed");
698 assert_eq!(removed, 5);
699 assert_eq!(graph.len(), 5);
700 }
701
702 #[test]
703 fn test_clear() {
704 let graph = ConcurrentGraph::new();
705
706 for i in 0..10 {
707 let triple = create_test_triple(&format!("http://s{i}"), "http://p", "http://o");
708 graph
709 .insert(triple)
710 .expect("graph operation should succeed");
711 }
712
713 assert_eq!(graph.len(), 10);
714 graph.clear().expect("graph operation should succeed");
715 assert_eq!(graph.len(), 0);
716 assert!(graph.is_empty());
717 }
718
719 #[test]
720 fn test_parallel_batch_insert() {
721 let graph = ConcurrentGraph::new();
722
723 let triples: Vec<Triple> = (0..200)
725 .map(|i| create_test_triple(&format!("http://s{i}"), "http://p", "http://o"))
726 .collect();
727
728 let inserted = graph
729 .insert_batch(triples)
730 .expect("graph operation should succeed");
731 assert_eq!(inserted, 200);
732 assert_eq!(graph.len(), 200);
733 }
734
735 #[test]
736 fn test_parallel_batch_remove() {
737 let graph = ConcurrentGraph::new();
738
739 let triples: Vec<Triple> = (0..200)
741 .map(|i| create_test_triple(&format!("http://s{i}"), "http://p", "http://o"))
742 .collect();
743
744 graph
745 .insert_batch(triples.clone())
746 .expect("batch insert should succeed");
747 assert_eq!(graph.len(), 200);
748
749 let removed = graph
751 .remove_batch(&triples)
752 .expect("graph operation should succeed");
753 assert_eq!(removed, 200);
754 assert_eq!(graph.len(), 0);
755 }
756
757 #[test]
758 fn test_rebuild_indices() {
759 let graph = ConcurrentGraph::new();
760
761 let triples: Vec<Triple> = (0..50)
763 .map(|i| create_test_triple(&format!("http://s{i}"), "http://p", "http://o"))
764 .collect();
765
766 graph
767 .insert_batch(triples)
768 .expect("graph operation should succeed");
769 assert_eq!(graph.len(), 50);
770
771 graph
773 .rebuild_indices()
774 .expect("graph operation should succeed");
775
776 let s = Subject::NamedNode(NamedNode::new("http://s0").expect("valid IRI"));
778 let matches = graph.match_pattern(Some(&s), None, None);
779 assert_eq!(matches.len(), 1);
780 }
781
782 #[test]
783 fn test_small_batch_sequential() {
784 let graph = ConcurrentGraph::new();
785
786 let triples: Vec<Triple> = (0..50)
788 .map(|i| create_test_triple(&format!("http://s{i}"), "http://p", "http://o"))
789 .collect();
790
791 let inserted = graph
792 .insert_batch(triples)
793 .expect("graph operation should succeed");
794 assert_eq!(inserted, 50);
795 assert_eq!(graph.len(), 50);
796 }
797}