oxirs_core/concurrent/
mod.rs

1//! Concurrent data structures for high-performance graph operations
2//!
3//! This module provides lock-free and wait-free data structures for
4//! concurrent access to RDF graphs, using epoch-based memory reclamation
5//! and atomic operations.
6
7pub mod batch_builder;
8pub mod epoch;
9pub mod lock_free_graph;
10pub mod parallel_batch;
11
12pub use batch_builder::{BatchBuilder, BatchBuilderConfig, BatchBuilderStats, CoalescingStrategy};
13pub use epoch::{EpochManager, HazardPointer, VersionedPointer};
14pub use lock_free_graph::{ConcurrentGraph, GraphStats};
15pub use parallel_batch::{
16    BatchConfig, BatchOperation, BatchStats, BatchStatsSummary, ParallelBatchProcessor,
17    ProgressCallback,
18};
19
20/// Re-export crossbeam epoch types for convenience
21pub use crossbeam_epoch::{pin, Guard};
22
23#[cfg(test)]
24mod tests {
25    use super::*;
26    use crate::model::{NamedNode, Object, Predicate, Subject, Triple};
27    use std::sync::Arc;
28    use std::thread;
29    use std::time::Instant;
30
31    fn create_test_triple(id: usize) -> Triple {
32        Triple::new(
33            Subject::NamedNode(NamedNode::new(format!("http://subject/{id}")).unwrap()),
34            Predicate::NamedNode(NamedNode::new(format!("http://predicate/{id}")).unwrap()),
35            Object::NamedNode(NamedNode::new(format!("http://object/{id}")).unwrap()),
36        )
37    }
38
39    #[test]
40    fn test_concurrent_stress() {
41        let graph = Arc::new(ConcurrentGraph::new());
42        let num_threads = 8;
43        let ops_per_thread = 1000;
44
45        let start = Instant::now();
46
47        // Spawn writer threads
48        let writer_handles: Vec<_> = (0..num_threads / 2)
49            .map(|thread_id| {
50                let graph = graph.clone();
51                thread::spawn(move || {
52                    for i in 0..ops_per_thread {
53                        let id = thread_id * ops_per_thread + i;
54                        let triple = create_test_triple(id);
55                        graph.insert(triple).unwrap();
56                    }
57                })
58            })
59            .collect();
60
61        // Spawn reader threads
62        let reader_handles: Vec<_> = (0..num_threads / 2)
63            .map(|_| {
64                let graph = graph.clone();
65                thread::spawn(move || {
66                    let mut read_count = 0;
67                    for _ in 0..ops_per_thread {
68                        let count = graph.len();
69                        if count > 0 {
70                            read_count += 1;
71                        }
72                        // Perform some pattern matching
73                        let _ = graph.match_pattern(None, None, None);
74                    }
75                    read_count
76                })
77            })
78            .collect();
79
80        // Wait for writers
81        for handle in writer_handles {
82            handle.join().unwrap();
83        }
84
85        // Wait for readers
86        let total_reads: usize = reader_handles.into_iter().map(|h| h.join().unwrap()).sum();
87
88        let duration = start.elapsed();
89
90        // Verify final state
91        assert_eq!(graph.len(), (num_threads / 2) * ops_per_thread);
92
93        println!("Concurrent stress test completed:");
94        println!("  Duration: {duration:?}");
95        let total_writes = (num_threads / 2) * ops_per_thread;
96        println!("  Total writes: {total_writes}");
97        println!("  Total reads: {total_reads}");
98        let graph_size = graph.len();
99        println!("  Final graph size: {graph_size}");
100        let stats = graph.stats();
101        println!("  Stats: {stats:?}");
102    }
103
104    #[test]
105    fn test_memory_reclamation() {
106        let graph = Arc::new(ConcurrentGraph::new());
107        let num_cycles = 10;
108        let triples_per_cycle = 1000;
109
110        for cycle in 0..num_cycles {
111            // Insert triples
112            let triples: Vec<_> = (0..triples_per_cycle)
113                .map(|i| create_test_triple(cycle * triples_per_cycle + i))
114                .collect();
115
116            graph.insert_batch(triples.clone()).unwrap();
117            assert_eq!(graph.len(), triples_per_cycle);
118
119            // Remove all triples
120            graph.remove_batch(&triples).unwrap();
121            assert_eq!(graph.len(), 0);
122
123            // Force memory reclamation
124            graph.collect();
125        }
126
127        // Final verification
128        assert!(graph.is_empty());
129    }
130
131    #[test]
132    fn test_concurrent_mixed_operations() {
133        let graph = Arc::new(ConcurrentGraph::new());
134        let num_threads = 6;
135        let ops_per_thread = 500;
136
137        let handles: Vec<_> = (0..num_threads)
138            .map(|thread_id| {
139                let graph = graph.clone();
140                thread::spawn(move || {
141                    let mut local_count = 0;
142                    for i in 0..ops_per_thread {
143                        let id = thread_id * ops_per_thread + i;
144                        let triple = create_test_triple(id);
145
146                        match i % 3 {
147                            0 => {
148                                // Insert
149                                if graph.insert(triple).unwrap() {
150                                    local_count += 1;
151                                }
152                            }
153                            1 => {
154                                // Query
155                                let _ = graph.contains(&triple);
156                                let _ = graph.match_pattern(Some(triple.subject()), None, None);
157                            }
158                            2 => {
159                                // Remove (might fail if not inserted)
160                                if graph.remove(&triple).unwrap() {
161                                    local_count -= 1;
162                                }
163                            }
164                            _ => unreachable!(),
165                        }
166                    }
167                    local_count
168                })
169            })
170            .collect();
171
172        let total_net_insertions: i32 = handles.into_iter().map(|h| h.join().unwrap()).sum();
173
174        println!("Mixed operations test:");
175        println!("  Net insertions: {total_net_insertions}");
176        let graph_size = graph.len();
177        println!("  Final graph size: {graph_size}");
178        let stats = graph.stats();
179        println!("  Stats: {stats:?}");
180
181        // The graph size should be consistent with net insertions
182        assert!(total_net_insertions >= 0);
183    }
184
185    #[test]
186    fn test_epoch_progression() {
187        let graph = Arc::new(ConcurrentGraph::new());
188        let initial_stats = graph.stats();
189
190        // Perform operations
191        for i in 0..100 {
192            let triple = create_test_triple(i);
193            graph.insert(triple.clone()).unwrap();
194            graph.remove(&triple).unwrap();
195        }
196
197        // Force collection multiple times
198        for _ in 0..5 {
199            graph.collect();
200        }
201
202        let final_stats = graph.stats();
203
204        assert!(final_stats.operation_count > initial_stats.operation_count);
205        assert!(final_stats.current_epoch > initial_stats.current_epoch);
206        assert_eq!(final_stats.triple_count, 0);
207    }
208}