oxirs_core/concurrent/
mod.rs1pub mod batch_builder;
8pub mod epoch;
9pub mod lock_free_graph;
10pub mod parallel_batch;
11
12pub use batch_builder::{BatchBuilder, BatchBuilderConfig, BatchBuilderStats, CoalescingStrategy};
13pub use epoch::{EpochManager, HazardPointer, VersionedPointer};
14pub use lock_free_graph::{ConcurrentGraph, GraphStats};
15pub use parallel_batch::{
16 BatchConfig, BatchOperation, BatchStats, BatchStatsSummary, ParallelBatchProcessor,
17 ProgressCallback,
18};
19
20pub use crossbeam_epoch::{pin, Guard};
22
23#[cfg(test)]
24mod tests {
25 use super::*;
26 use crate::model::{NamedNode, Object, Predicate, Subject, Triple};
27 use std::sync::Arc;
28 use std::thread;
29 use std::time::Instant;
30
31 fn create_test_triple(id: usize) -> Triple {
32 Triple::new(
33 Subject::NamedNode(NamedNode::new(format!("http://subject/{id}")).unwrap()),
34 Predicate::NamedNode(NamedNode::new(format!("http://predicate/{id}")).unwrap()),
35 Object::NamedNode(NamedNode::new(format!("http://object/{id}")).unwrap()),
36 )
37 }
38
39 #[test]
40 fn test_concurrent_stress() {
41 let graph = Arc::new(ConcurrentGraph::new());
42 let num_threads = 8;
43 let ops_per_thread = 1000;
44
45 let start = Instant::now();
46
47 let writer_handles: Vec<_> = (0..num_threads / 2)
49 .map(|thread_id| {
50 let graph = graph.clone();
51 thread::spawn(move || {
52 for i in 0..ops_per_thread {
53 let id = thread_id * ops_per_thread + i;
54 let triple = create_test_triple(id);
55 graph.insert(triple).unwrap();
56 }
57 })
58 })
59 .collect();
60
61 let reader_handles: Vec<_> = (0..num_threads / 2)
63 .map(|_| {
64 let graph = graph.clone();
65 thread::spawn(move || {
66 let mut read_count = 0;
67 for _ in 0..ops_per_thread {
68 let count = graph.len();
69 if count > 0 {
70 read_count += 1;
71 }
72 let _ = graph.match_pattern(None, None, None);
74 }
75 read_count
76 })
77 })
78 .collect();
79
80 for handle in writer_handles {
82 handle.join().unwrap();
83 }
84
85 let total_reads: usize = reader_handles.into_iter().map(|h| h.join().unwrap()).sum();
87
88 let duration = start.elapsed();
89
90 assert_eq!(graph.len(), (num_threads / 2) * ops_per_thread);
92
93 println!("Concurrent stress test completed:");
94 println!(" Duration: {duration:?}");
95 let total_writes = (num_threads / 2) * ops_per_thread;
96 println!(" Total writes: {total_writes}");
97 println!(" Total reads: {total_reads}");
98 let graph_size = graph.len();
99 println!(" Final graph size: {graph_size}");
100 let stats = graph.stats();
101 println!(" Stats: {stats:?}");
102 }
103
104 #[test]
105 fn test_memory_reclamation() {
106 let graph = Arc::new(ConcurrentGraph::new());
107 let num_cycles = 10;
108 let triples_per_cycle = 1000;
109
110 for cycle in 0..num_cycles {
111 let triples: Vec<_> = (0..triples_per_cycle)
113 .map(|i| create_test_triple(cycle * triples_per_cycle + i))
114 .collect();
115
116 graph.insert_batch(triples.clone()).unwrap();
117 assert_eq!(graph.len(), triples_per_cycle);
118
119 graph.remove_batch(&triples).unwrap();
121 assert_eq!(graph.len(), 0);
122
123 graph.collect();
125 }
126
127 assert!(graph.is_empty());
129 }
130
131 #[test]
132 fn test_concurrent_mixed_operations() {
133 let graph = Arc::new(ConcurrentGraph::new());
134 let num_threads = 6;
135 let ops_per_thread = 500;
136
137 let handles: Vec<_> = (0..num_threads)
138 .map(|thread_id| {
139 let graph = graph.clone();
140 thread::spawn(move || {
141 let mut local_count = 0;
142 for i in 0..ops_per_thread {
143 let id = thread_id * ops_per_thread + i;
144 let triple = create_test_triple(id);
145
146 match i % 3 {
147 0 => {
148 if graph.insert(triple).unwrap() {
150 local_count += 1;
151 }
152 }
153 1 => {
154 let _ = graph.contains(&triple);
156 let _ = graph.match_pattern(Some(triple.subject()), None, None);
157 }
158 2 => {
159 if graph.remove(&triple).unwrap() {
161 local_count -= 1;
162 }
163 }
164 _ => unreachable!(),
165 }
166 }
167 local_count
168 })
169 })
170 .collect();
171
172 let total_net_insertions: i32 = handles.into_iter().map(|h| h.join().unwrap()).sum();
173
174 println!("Mixed operations test:");
175 println!(" Net insertions: {total_net_insertions}");
176 let graph_size = graph.len();
177 println!(" Final graph size: {graph_size}");
178 let stats = graph.stats();
179 println!(" Stats: {stats:?}");
180
181 assert!(total_net_insertions >= 0);
183 }
184
185 #[test]
186 fn test_epoch_progression() {
187 let graph = Arc::new(ConcurrentGraph::new());
188 let initial_stats = graph.stats();
189
190 for i in 0..100 {
192 let triple = create_test_triple(i);
193 graph.insert(triple.clone()).unwrap();
194 graph.remove(&triple).unwrap();
195 }
196
197 for _ in 0..5 {
199 graph.collect();
200 }
201
202 let final_stats = graph.stats();
203
204 assert!(final_stats.operation_count > initial_stats.operation_count);
205 assert!(final_stats.current_epoch > initial_stats.current_epoch);
206 assert_eq!(final_stats.triple_count, 0);
207 }
208}