Skip to main content

oxirs_core/concurrent/
lock_free_graph.rs

1//! Lock-free graph implementation for high-performance concurrent access
2//!
3//! This module provides a wait-free reader, lock-free writer graph structure
4//! using epoch-based memory reclamation and atomic operations.
5
6use super::epoch::{EpochManager, HazardPointer};
7use crate::model::{Object, Predicate, Subject, Triple};
8use crate::OxirsError;
9use crossbeam_epoch::Owned;
10use dashmap::DashMap;
11use std::collections::HashSet;
12use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
13use std::sync::Arc;
14
15/// Index type for fast triple lookups
16/// A lock-free graph node containing triples
17struct GraphNode {
18    /// The triples stored in this node
19    triples: Arc<DashMap<u64, Triple>>,
20    /// Version number for optimistic concurrency control
21    version: AtomicU64,
22    /// Index for SPO (Subject-Predicate-Object) lookups
23    spo_index: Arc<DashMap<Subject, DashMap<Predicate, HashSet<Object>>>>,
24    /// Index for POS (Predicate-Object-Subject) lookups
25    #[allow(dead_code)]
26    pos_index: Arc<DashMap<Predicate, DashMap<Object, HashSet<Subject>>>>,
27    /// Index for OSP (Object-Subject-Predicate) lookups
28    osp_index: Arc<DashMap<Object, DashMap<Subject, HashSet<Predicate>>>>,
29}
30
31impl GraphNode {
32    fn new() -> Self {
33        Self {
34            triples: Arc::new(DashMap::new()),
35            version: AtomicU64::new(0),
36            spo_index: Arc::new(DashMap::new()),
37            pos_index: Arc::new(DashMap::new()),
38            osp_index: Arc::new(DashMap::new()),
39        }
40    }
41
42    fn increment_version(&self) -> u64 {
43        self.version.fetch_add(1, Ordering::Release)
44    }
45}
46
47/// A concurrent, lock-free graph implementation
48pub struct ConcurrentGraph {
49    /// The current graph state
50    graph: Arc<HazardPointer<GraphNode>>,
51    /// Epoch manager for memory reclamation
52    epoch_manager: Arc<EpochManager>,
53    /// Triple counter
54    triple_count: Arc<AtomicUsize>,
55    /// Operation counter for metrics
56    operation_count: Arc<AtomicU64>,
57}
58
59impl ConcurrentGraph {
60    /// Create a new concurrent graph
61    pub fn new() -> Self {
62        let graph_node = GraphNode::new();
63        Self {
64            graph: Arc::new(HazardPointer::new(graph_node)),
65            epoch_manager: Arc::new(EpochManager::new()),
66            triple_count: Arc::new(AtomicUsize::new(0)),
67            operation_count: Arc::new(AtomicU64::new(0)),
68        }
69    }
70
71    /// Insert a triple into the graph (lock-free)
72    pub fn insert(&self, triple: Triple) -> Result<bool, OxirsError> {
73        let guard = self.epoch_manager.pin();
74        self.operation_count.fetch_add(1, Ordering::Relaxed);
75
76        // Generate a unique ID for this triple
77        let triple_id = self.hash_triple(&triple);
78
79        // Load current graph state
80        let current = self.graph.load(&guard);
81        let graph_node = unsafe {
82            current
83                .as_ref()
84                .ok_or_else(|| OxirsError::Store("Graph not initialized".to_string()))?
85        };
86
87        // Check if triple already exists (wait-free read)
88        if graph_node.triples.contains_key(&triple_id) {
89            return Ok(false);
90        }
91
92        // Insert into main storage
93        graph_node.triples.insert(triple_id, triple.clone());
94
95        // Update indices
96        self.update_indices_insert(graph_node, &triple);
97
98        // Increment version
99        graph_node.increment_version();
100
101        // Update counter
102        self.triple_count.fetch_add(1, Ordering::Release);
103
104        Ok(true)
105    }
106
107    /// Remove a triple from the graph (lock-free)
108    pub fn remove(&self, triple: &Triple) -> Result<bool, OxirsError> {
109        let guard = self.epoch_manager.pin();
110        self.operation_count.fetch_add(1, Ordering::Relaxed);
111
112        let triple_id = self.hash_triple(triple);
113
114        // Load current graph state
115        let current = self.graph.load(&guard);
116        let graph_node = unsafe {
117            current
118                .as_ref()
119                .ok_or_else(|| OxirsError::Store("Graph not initialized".to_string()))?
120        };
121
122        // Remove from main storage
123        if graph_node.triples.remove(&triple_id).is_none() {
124            return Ok(false);
125        }
126
127        // Update indices
128        self.update_indices_remove(graph_node, triple);
129
130        // Increment version
131        graph_node.increment_version();
132
133        // Update counter
134        self.triple_count.fetch_sub(1, Ordering::Release);
135
136        Ok(true)
137    }
138
139    /// Check if a triple exists (wait-free)
140    pub fn contains(&self, triple: &Triple) -> bool {
141        let guard = self.epoch_manager.pin();
142        let triple_id = self.hash_triple(triple);
143
144        if let Some(graph_node) = unsafe { self.graph.load(&guard).as_ref() } {
145            graph_node.triples.contains_key(&triple_id)
146        } else {
147            false
148        }
149    }
150
151    /// Get the number of triples (wait-free)
152    pub fn len(&self) -> usize {
153        self.triple_count.load(Ordering::Acquire)
154    }
155
156    /// Check if the graph is empty (wait-free)
157    pub fn is_empty(&self) -> bool {
158        self.len() == 0
159    }
160
161    /// Iterate over all triples (wait-free snapshot)
162    pub fn iter(&self) -> impl Iterator<Item = Triple> + '_ {
163        let guard = self.epoch_manager.pin();
164        let snapshot = if let Some(graph_node) = unsafe { self.graph.load(&guard).as_ref() } {
165            graph_node
166                .triples
167                .iter()
168                .map(|entry| entry.value().clone())
169                .collect::<Vec<_>>()
170        } else {
171            Vec::new()
172        };
173
174        snapshot.into_iter()
175    }
176
177    /// Find triples matching a pattern (wait-free)
178    pub fn match_pattern(
179        &self,
180        subject: Option<&Subject>,
181        predicate: Option<&Predicate>,
182        object: Option<&Object>,
183    ) -> Vec<Triple> {
184        let guard = self.epoch_manager.pin();
185        let graph_node = match unsafe { self.graph.load(&guard).as_ref() } {
186            Some(node) => node,
187            None => return Vec::new(),
188        };
189
190        match (subject, predicate, object) {
191            // All components specified
192            (Some(s), Some(p), Some(o)) => {
193                let triple = Triple::new(s.clone(), p.clone(), o.clone());
194                if self.contains(&triple) {
195                    vec![triple]
196                } else {
197                    Vec::new()
198                }
199            }
200            // Subject and predicate specified
201            (Some(s), Some(p), None) => match graph_node.spo_index.get(s) {
202                Some(pred_map) => match pred_map.get(p) {
203                    Some(obj_set) => obj_set
204                        .iter()
205                        .map(|o| Triple::new(s.clone(), p.clone(), o.clone()))
206                        .collect(),
207                    _ => Vec::new(),
208                },
209                _ => Vec::new(),
210            },
211            // Only subject specified
212            (Some(s), None, None) => match graph_node.spo_index.get(s) {
213                Some(pred_map) => pred_map
214                    .iter()
215                    .flat_map(|pred_entry| {
216                        let p = pred_entry.key().clone();
217                        let s = s.clone();
218                        pred_entry
219                            .value()
220                            .iter()
221                            .map(move |o| Triple::new(s.clone(), p.clone(), o.clone()))
222                            .collect::<Vec<_>>()
223                    })
224                    .collect(),
225                _ => Vec::new(),
226            },
227            // Object specified
228            (None, None, Some(o)) => match graph_node.osp_index.get(o) {
229                Some(subj_map) => subj_map
230                    .iter()
231                    .flat_map(|subj_entry| {
232                        let s = subj_entry.key().clone();
233                        let o = o.clone();
234                        subj_entry
235                            .value()
236                            .iter()
237                            .map(move |p| Triple::new(s.clone(), p.clone(), o.clone()))
238                            .collect::<Vec<_>>()
239                    })
240                    .collect(),
241                _ => Vec::new(),
242            },
243            // Other patterns - scan all triples
244            _ => graph_node
245                .triples
246                .iter()
247                .map(|entry| entry.value().clone())
248                .filter(|t| {
249                    subject.map_or(true, |s| t.subject() == s)
250                        && predicate.map_or(true, |p| t.predicate() == p)
251                        && object.map_or(true, |o| t.object() == o)
252                })
253                .collect(),
254        }
255    }
256
257    /// Get statistics about the graph
258    pub fn stats(&self) -> GraphStats {
259        GraphStats {
260            triple_count: self.len(),
261            operation_count: self.operation_count.load(Ordering::Relaxed),
262            current_epoch: self.epoch_manager.current_epoch(),
263        }
264    }
265
266    /// Force memory reclamation
267    pub fn collect(&self) {
268        let guard = self.epoch_manager.pin();
269        self.epoch_manager.flush(&guard);
270        self.epoch_manager.advance();
271    }
272
273    // Helper methods
274
275    fn hash_triple(&self, triple: &Triple) -> u64 {
276        use std::hash::{Hash, Hasher};
277        let mut hasher = ahash::AHasher::default();
278        triple.subject().hash(&mut hasher);
279        triple.predicate().hash(&mut hasher);
280        triple.object().hash(&mut hasher);
281        hasher.finish()
282    }
283
284    fn update_indices_insert(&self, graph_node: &GraphNode, triple: &Triple) {
285        // Update SPO index
286        graph_node
287            .spo_index
288            .entry(triple.subject().clone())
289            .or_default()
290            .entry(triple.predicate().clone())
291            .or_default()
292            .insert(triple.object().clone());
293
294        // Update OSP index
295        graph_node
296            .osp_index
297            .entry(triple.object().clone())
298            .or_default()
299            .entry(triple.subject().clone())
300            .or_default()
301            .insert(triple.predicate().clone());
302    }
303
304    fn update_indices_remove(&self, graph_node: &GraphNode, triple: &Triple) {
305        // Update SPO index
306        if let Some(pred_map) = graph_node.spo_index.get_mut(triple.subject()) {
307            if let Some(mut obj_set) = pred_map.get_mut(triple.predicate()) {
308                obj_set.remove(triple.object());
309                if obj_set.is_empty() {
310                    drop(obj_set);
311                    pred_map.remove(triple.predicate());
312                }
313            }
314            if pred_map.is_empty() {
315                drop(pred_map);
316                graph_node.spo_index.remove(triple.subject());
317            }
318        }
319
320        // Update OSP index
321        if let Some(subj_map) = graph_node.osp_index.get_mut(triple.object()) {
322            if let Some(mut pred_set) = subj_map.get_mut(triple.subject()) {
323                pred_set.remove(triple.predicate());
324                if pred_set.is_empty() {
325                    drop(pred_set);
326                    subj_map.remove(triple.subject());
327                }
328            }
329            if subj_map.is_empty() {
330                drop(subj_map);
331                graph_node.osp_index.remove(triple.object());
332            }
333        }
334    }
335}
336
337impl Default for ConcurrentGraph {
338    fn default() -> Self {
339        Self::new()
340    }
341}
342
343/// Statistics about the concurrent graph
344#[derive(Debug, Clone)]
345pub struct GraphStats {
346    pub triple_count: usize,
347    pub operation_count: u64,
348    pub current_epoch: usize,
349}
350
351/// Batch operations for improved performance
352impl ConcurrentGraph {
353    /// Insert multiple triples in a batch
354    ///
355    /// For small batches (<100), uses sequential insertion.
356    /// For large batches, uses parallel insertion with concurrent index updates.
357    pub fn insert_batch(&self, triples: Vec<Triple>) -> Result<usize, OxirsError> {
358        // For small batches, use sequential insertion
359        if triples.len() < 100 {
360            let mut inserted = 0;
361            for triple in triples {
362                if self.insert(triple)? {
363                    inserted += 1;
364                }
365            }
366            return Ok(inserted);
367        }
368
369        // For large batches, use parallel insertion
370        self.insert_batch_parallel(triples)
371    }
372
373    /// Parallel batch insertion for large datasets
374    ///
375    /// Uses Rayon for parallel processing and concurrent index updates.
376    /// This provides significant speedup for bulk loading operations.
377    #[cfg(feature = "parallel")]
378    fn insert_batch_parallel(&self, triples: Vec<Triple>) -> Result<usize, OxirsError> {
379        use rayon::prelude::*;
380        use std::sync::atomic::AtomicUsize;
381
382        let inserted_count = AtomicUsize::new(0);
383        let errors: Arc<parking_lot::Mutex<Vec<OxirsError>>> =
384            Arc::new(parking_lot::Mutex::new(Vec::new()));
385
386        // Process in parallel
387        triples.par_iter().for_each(|triple| {
388            match self.insert(triple.clone()) {
389                Ok(true) => {
390                    inserted_count.fetch_add(1, Ordering::Relaxed);
391                }
392                Ok(false) => {
393                    // Already exists, not an error
394                }
395                Err(e) => {
396                    errors.lock().push(e);
397                }
398            }
399        });
400
401        // Check for errors
402        let error_vec = errors.lock();
403        if !error_vec.is_empty() {
404            return Err(OxirsError::Store(format!(
405                "Batch insert failed with {} errors",
406                error_vec.len()
407            )));
408        }
409
410        Ok(inserted_count.load(Ordering::Relaxed))
411    }
412
413    /// Sequential fallback for parallel batch insertion
414    #[cfg(not(feature = "parallel"))]
415    fn insert_batch_parallel(&self, triples: Vec<Triple>) -> Result<usize, OxirsError> {
416        let mut inserted = 0;
417        for triple in triples {
418            if self.insert(triple)? {
419                inserted += 1;
420            }
421        }
422        Ok(inserted)
423    }
424
425    /// Remove multiple triples in a batch
426    ///
427    /// For small batches (<100), uses sequential removal.
428    /// For large batches, uses parallel removal with concurrent index updates.
429    pub fn remove_batch(&self, triples: &[Triple]) -> Result<usize, OxirsError> {
430        // For small batches, use sequential removal
431        if triples.len() < 100 {
432            let mut removed = 0;
433            for triple in triples {
434                if self.remove(triple)? {
435                    removed += 1;
436                }
437            }
438            return Ok(removed);
439        }
440
441        // For large batches, use parallel removal
442        self.remove_batch_parallel(triples)
443    }
444
445    /// Parallel batch removal for large datasets
446    #[cfg(feature = "parallel")]
447    fn remove_batch_parallel(&self, triples: &[Triple]) -> Result<usize, OxirsError> {
448        use rayon::prelude::*;
449        use std::sync::atomic::AtomicUsize;
450
451        let removed_count = AtomicUsize::new(0);
452        let errors: Arc<parking_lot::Mutex<Vec<OxirsError>>> =
453            Arc::new(parking_lot::Mutex::new(Vec::new()));
454
455        // Process in parallel
456        triples.par_iter().for_each(|triple| {
457            match self.remove(triple) {
458                Ok(true) => {
459                    removed_count.fetch_add(1, Ordering::Relaxed);
460                }
461                Ok(false) => {
462                    // Doesn't exist, not an error
463                }
464                Err(e) => {
465                    errors.lock().push(e);
466                }
467            }
468        });
469
470        // Check for errors
471        let error_vec = errors.lock();
472        if !error_vec.is_empty() {
473            return Err(OxirsError::Store(format!(
474                "Batch remove failed with {} errors",
475                error_vec.len()
476            )));
477        }
478
479        Ok(removed_count.load(Ordering::Relaxed))
480    }
481
482    /// Sequential fallback for parallel batch removal
483    #[cfg(not(feature = "parallel"))]
484    fn remove_batch_parallel(&self, triples: &[Triple]) -> Result<usize, OxirsError> {
485        let mut removed = 0;
486        for triple in triples {
487            if self.remove(triple)? {
488                removed += 1;
489            }
490        }
491        Ok(removed)
492    }
493
494    /// Rebuild all indices from scratch (useful for optimization after many operations)
495    ///
496    /// This operation is expensive but can improve query performance by defragmenting
497    /// the index structures and removing empty entries.
498    pub fn rebuild_indices(&self) -> Result<(), OxirsError> {
499        let guard = self.epoch_manager.pin();
500
501        // Load current graph state
502        let current = self.graph.load(&guard);
503        let graph_node = unsafe {
504            current
505                .as_ref()
506                .ok_or_else(|| OxirsError::Store("Graph not initialized".to_string()))?
507        };
508
509        // Clear existing indices
510        graph_node.spo_index.clear();
511        graph_node.pos_index.clear();
512        graph_node.osp_index.clear();
513
514        // Rebuild from triples
515        #[cfg(feature = "parallel")]
516        {
517            use rayon::prelude::*;
518
519            // Collect triples into a vector for parallel processing
520            let triples: Vec<Triple> = graph_node
521                .triples
522                .iter()
523                .map(|entry| entry.value().clone())
524                .collect();
525
526            triples.par_iter().for_each(|triple| {
527                self.update_indices_insert(graph_node, triple);
528            });
529        }
530
531        #[cfg(not(feature = "parallel"))]
532        {
533            for entry in graph_node.triples.iter() {
534                let triple = entry.value();
535                self.update_indices_insert(graph_node, triple);
536            }
537        }
538
539        // Increment version
540        graph_node.increment_version();
541
542        Ok(())
543    }
544
545    /// Clear all triples from the graph
546    pub fn clear(&self) -> Result<(), OxirsError> {
547        let guard = self.epoch_manager.pin();
548
549        // Create new empty graph node
550        let new_node = GraphNode::new();
551        self.graph.store(Owned::new(new_node), &guard);
552
553        // Reset counter
554        self.triple_count.store(0, Ordering::Release);
555
556        // Force collection of old data
557        self.collect();
558
559        Ok(())
560    }
561}
562
563#[cfg(test)]
564mod tests {
565    use super::*;
566    use crate::NamedNode;
567
568    fn create_test_triple(s: &str, p: &str, o: &str) -> Triple {
569        Triple::new(
570            Subject::NamedNode(NamedNode::new(s).expect("valid IRI")),
571            Predicate::NamedNode(NamedNode::new(p).expect("valid IRI")),
572            Object::NamedNode(NamedNode::new(o).expect("valid IRI")),
573        )
574    }
575
576    #[test]
577    fn test_concurrent_insert() {
578        let graph = ConcurrentGraph::new();
579        let triple = create_test_triple("http://s", "http://p", "http://o");
580
581        assert!(graph
582            .insert(triple.clone())
583            .expect("graph insert should succeed"));
584        assert!(!graph
585            .insert(triple.clone())
586            .expect("graph insert should succeed"));
587        assert_eq!(graph.len(), 1);
588        assert!(graph.contains(&triple));
589    }
590
591    #[test]
592    fn test_concurrent_remove() {
593        let graph = ConcurrentGraph::new();
594        let triple = create_test_triple("http://s", "http://p", "http://o");
595
596        assert!(graph
597            .insert(triple.clone())
598            .expect("graph insert should succeed"));
599        assert!(graph
600            .remove(&triple)
601            .expect("graph operation should succeed"));
602        assert!(!graph
603            .remove(&triple)
604            .expect("graph operation should succeed"));
605        assert_eq!(graph.len(), 0);
606        assert!(!graph.contains(&triple));
607    }
608
609    #[test]
610    fn test_pattern_matching() {
611        let graph = ConcurrentGraph::new();
612
613        // Insert test data
614        let t1 = create_test_triple("http://s1", "http://p1", "http://o1");
615        let t2 = create_test_triple("http://s1", "http://p1", "http://o2");
616        let t3 = create_test_triple("http://s1", "http://p2", "http://o1");
617        let t4 = create_test_triple("http://s2", "http://p1", "http://o1");
618
619        graph
620            .insert(t1.clone())
621            .expect("graph insert should succeed");
622        graph
623            .insert(t2.clone())
624            .expect("graph insert should succeed");
625        graph
626            .insert(t3.clone())
627            .expect("graph insert should succeed");
628        graph
629            .insert(t4.clone())
630            .expect("graph insert should succeed");
631
632        // Test subject pattern
633        let s1 = Subject::NamedNode(NamedNode::new("http://s1").expect("valid IRI"));
634        let matches = graph.match_pattern(Some(&s1), None, None);
635        assert_eq!(matches.len(), 3);
636
637        // Test subject-predicate pattern
638        let p1 = Predicate::NamedNode(NamedNode::new("http://p1").expect("valid IRI"));
639        let matches = graph.match_pattern(Some(&s1), Some(&p1), None);
640        assert_eq!(matches.len(), 2);
641
642        // Test object pattern
643        let o1 = Object::NamedNode(NamedNode::new("http://o1").expect("valid IRI"));
644        let matches = graph.match_pattern(None, None, Some(&o1));
645        assert_eq!(matches.len(), 3);
646    }
647
648    #[test]
649    fn test_concurrent_operations() {
650        use std::thread;
651
652        let graph = Arc::new(ConcurrentGraph::new());
653        let num_threads = 4;
654        let ops_per_thread = 100;
655
656        let handles: Vec<_> = (0..num_threads)
657            .map(|i| {
658                let graph = graph.clone();
659                thread::spawn(move || {
660                    for j in 0..ops_per_thread {
661                        let triple = create_test_triple(
662                            &format!("http://s{i}"),
663                            &format!("http://p{j}"),
664                            &format!("http://o{}", i * ops_per_thread + j),
665                        );
666                        graph
667                            .insert(triple)
668                            .expect("graph operation should succeed");
669                    }
670                })
671            })
672            .collect();
673
674        for handle in handles {
675            handle.join().expect("thread should not panic");
676        }
677
678        assert_eq!(graph.len(), num_threads * ops_per_thread);
679    }
680
681    #[test]
682    fn test_batch_operations() {
683        let graph = ConcurrentGraph::new();
684
685        let triples: Vec<_> = (0..10)
686            .map(|i| create_test_triple(&format!("http://s{i}"), "http://p", "http://o"))
687            .collect();
688
689        let inserted = graph
690            .insert_batch(triples.clone())
691            .expect("batch insert should succeed");
692        assert_eq!(inserted, 10);
693        assert_eq!(graph.len(), 10);
694
695        let removed = graph
696            .remove_batch(&triples[0..5])
697            .expect("graph operation should succeed");
698        assert_eq!(removed, 5);
699        assert_eq!(graph.len(), 5);
700    }
701
702    #[test]
703    fn test_clear() {
704        let graph = ConcurrentGraph::new();
705
706        for i in 0..10 {
707            let triple = create_test_triple(&format!("http://s{i}"), "http://p", "http://o");
708            graph
709                .insert(triple)
710                .expect("graph operation should succeed");
711        }
712
713        assert_eq!(graph.len(), 10);
714        graph.clear().expect("graph operation should succeed");
715        assert_eq!(graph.len(), 0);
716        assert!(graph.is_empty());
717    }
718
719    #[test]
720    fn test_parallel_batch_insert() {
721        let graph = ConcurrentGraph::new();
722
723        // Create a large batch (>100 to trigger parallel mode)
724        let triples: Vec<Triple> = (0..200)
725            .map(|i| create_test_triple(&format!("http://s{i}"), "http://p", "http://o"))
726            .collect();
727
728        let inserted = graph
729            .insert_batch(triples)
730            .expect("graph operation should succeed");
731        assert_eq!(inserted, 200);
732        assert_eq!(graph.len(), 200);
733    }
734
735    #[test]
736    fn test_parallel_batch_remove() {
737        let graph = ConcurrentGraph::new();
738
739        // Insert test data
740        let triples: Vec<Triple> = (0..200)
741            .map(|i| create_test_triple(&format!("http://s{i}"), "http://p", "http://o"))
742            .collect();
743
744        graph
745            .insert_batch(triples.clone())
746            .expect("batch insert should succeed");
747        assert_eq!(graph.len(), 200);
748
749        // Remove in batch
750        let removed = graph
751            .remove_batch(&triples)
752            .expect("graph operation should succeed");
753        assert_eq!(removed, 200);
754        assert_eq!(graph.len(), 0);
755    }
756
757    #[test]
758    fn test_rebuild_indices() {
759        let graph = ConcurrentGraph::new();
760
761        // Insert triples
762        let triples: Vec<Triple> = (0..50)
763            .map(|i| create_test_triple(&format!("http://s{i}"), "http://p", "http://o"))
764            .collect();
765
766        graph
767            .insert_batch(triples)
768            .expect("graph operation should succeed");
769        assert_eq!(graph.len(), 50);
770
771        // Rebuild indices
772        graph
773            .rebuild_indices()
774            .expect("graph operation should succeed");
775
776        // Verify queries still work
777        let s = Subject::NamedNode(NamedNode::new("http://s0").expect("valid IRI"));
778        let matches = graph.match_pattern(Some(&s), None, None);
779        assert_eq!(matches.len(), 1);
780    }
781
782    #[test]
783    fn test_small_batch_sequential() {
784        let graph = ConcurrentGraph::new();
785
786        // Small batch should use sequential insertion
787        let triples: Vec<Triple> = (0..50)
788            .map(|i| create_test_triple(&format!("http://s{i}"), "http://p", "http://o"))
789            .collect();
790
791        let inserted = graph
792            .insert_batch(triples)
793            .expect("graph operation should succeed");
794        assert_eq!(inserted, 50);
795        assert_eq!(graph.len(), 50);
796    }
797}