use crate::concurrent::{BatchConfig, BatchOperation, ParallelBatchProcessor};
use crate::model::*;
use crate::Result;
#[cfg(feature = "parallel")]
use rayon::prelude::*;
use std::collections::BTreeSet;
use std::sync::Arc;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Graph {
triples: BTreeSet<Triple>,
}
impl Graph {
pub fn new() -> Self {
Graph {
triples: BTreeSet::new(),
}
}
pub fn from_triples<I>(triples: I) -> Self
where
I: IntoIterator<Item = Triple>,
{
Graph {
triples: triples.into_iter().collect(),
}
}
pub fn add_triple(&mut self, triple: Triple) -> bool {
self.triples.insert(triple)
}
pub fn add_triple_str(&mut self, subject: &str, predicate: &str, object: &str) -> Result<bool> {
let subject_node = NamedNode::new(subject)?;
let predicate_node = NamedNode::new(predicate)?;
let object_literal = Literal::new(object);
let triple = Triple::new(subject_node, predicate_node, object_literal);
Ok(self.add_triple(triple))
}
pub fn remove_triple(&mut self, triple: &Triple) -> bool {
self.triples.remove(triple)
}
pub fn contains_triple(&self, triple: &Triple) -> bool {
self.triples.contains(triple)
}
pub fn query_triples(
&self,
subject: Option<&Subject>,
predicate: Option<&Predicate>,
object: Option<&Object>,
) -> Vec<Triple> {
self.triples
.iter()
.filter(|triple| triple.matches_pattern(subject, predicate, object))
.cloned()
.collect()
}
pub fn triples(&self) -> Vec<Triple> {
self.triples.iter().cloned().collect()
}
pub fn iter_triples(&self) -> impl Iterator<Item = &Triple> {
self.triples.iter()
}
pub fn insert(&mut self, triple: Triple) -> bool {
self.add_triple(triple)
}
pub fn iter(&self) -> impl Iterator<Item = &Triple> {
self.triples.iter()
}
pub fn contains(&self, triple: &Triple) -> bool {
self.contains_triple(triple)
}
pub fn subjects(&self) -> BTreeSet<Subject> {
self.triples.iter().map(|t| t.subject().clone()).collect()
}
pub fn predicates(&self) -> BTreeSet<Predicate> {
self.triples.iter().map(|t| t.predicate().clone()).collect()
}
pub fn objects(&self) -> BTreeSet<Object> {
self.triples.iter().map(|t| t.object().clone()).collect()
}
pub fn merge(&mut self, other: &Graph) {
for triple in &other.triples {
self.triples.insert(triple.clone());
}
}
pub fn union(&self, other: &Graph) -> Graph {
let mut result = self.clone();
result.merge(other);
result
}
pub fn intersection(&self, other: &Graph) -> Graph {
let intersection_triples: BTreeSet<Triple> =
self.triples.intersection(&other.triples).cloned().collect();
Graph {
triples: intersection_triples,
}
}
pub fn difference(&self, other: &Graph) -> Graph {
let difference_triples: BTreeSet<Triple> =
self.triples.difference(&other.triples).cloned().collect();
Graph {
triples: difference_triples,
}
}
pub fn clear(&mut self) {
self.triples.clear();
}
pub fn len(&self) -> usize {
self.triples.len()
}
pub fn is_empty(&self) -> bool {
self.triples.is_empty()
}
pub fn is_isomorphic_to(&self, other: &Graph) -> bool {
self.triples == other.triples
}
#[cfg(feature = "parallel")]
pub fn par_insert_batch(&mut self, triples: Vec<Triple>) -> Result<usize> {
if triples.is_empty() {
return Ok(0);
}
let config = BatchConfig::auto();
let batch_size = config.batch_size;
let processor = ParallelBatchProcessor::new(config);
let operations: Vec<_> = triples
.par_chunks(batch_size)
.map(|chunk| BatchOperation::insert(chunk.to_vec()))
.collect();
processor.submit_batch(operations)?;
let all_triples = Arc::new(parking_lot::Mutex::new(Vec::new()));
let all_triples_clone = all_triples.clone();
processor.process(move |op| -> Result<()> {
match op {
BatchOperation::Insert(batch_triples) => {
all_triples_clone.lock().extend(batch_triples);
Ok(())
}
_ => Ok(()),
}
})?;
let mut inserted = 0;
for triple in all_triples.lock().drain(..) {
if self.triples.insert(triple) {
inserted += 1;
}
}
Ok(inserted)
}
#[cfg(feature = "parallel")]
pub fn par_remove_batch(&mut self, triples: Vec<Triple>) -> Result<usize> {
if triples.is_empty() {
return Ok(0);
}
let config = BatchConfig::auto();
let batch_size = config.batch_size;
let processor = ParallelBatchProcessor::new(config);
let operations: Vec<_> = triples
.par_chunks(batch_size)
.map(|chunk| BatchOperation::remove(chunk.to_vec()))
.collect();
processor.submit_batch(operations)?;
let triples_to_remove = Arc::new(parking_lot::Mutex::new(Vec::new()));
let triples_clone = triples_to_remove.clone();
processor.process(move |op| -> Result<()> {
match op {
BatchOperation::Remove(batch_triples) => {
triples_clone.lock().extend(batch_triples);
Ok(())
}
_ => Ok(()),
}
})?;
let mut removed = 0;
for triple in triples_to_remove.lock().drain(..) {
if self.triples.remove(&triple) {
removed += 1;
}
}
Ok(removed)
}
#[cfg(feature = "parallel")]
pub fn par_query_batch(
&self,
queries: Vec<(Option<Subject>, Option<Predicate>, Option<Object>)>,
) -> Result<Vec<Vec<Triple>>> {
if queries.is_empty() {
return Ok(vec![]);
}
let config = BatchConfig::auto();
let processor = ParallelBatchProcessor::new(config);
let operations: Vec<_> = queries
.into_iter()
.map(|(s, p, o)| BatchOperation::query(s, p, o))
.collect();
processor.submit_batch(operations)?;
let triples = self.triples.clone();
let results = processor.process(move |op| -> Result<Vec<Triple>> {
match op {
BatchOperation::Query {
subject,
predicate,
object,
} => {
let matching: Vec<Triple> = triples
.iter()
.filter(|triple| {
triple.matches_pattern(
subject.as_ref(),
predicate.as_ref(),
object.as_ref(),
)
})
.cloned()
.collect();
Ok(matching)
}
_ => Ok(vec![]),
}
})?;
Ok(results)
}
#[cfg(feature = "parallel")]
pub fn par_transform<F>(&mut self, transform_fn: F) -> Result<(usize, usize)>
where
F: Fn(&Triple) -> Option<Triple> + Send + Sync + 'static,
{
let triples: Vec<Triple> = self.triples.iter().cloned().collect();
if triples.is_empty() {
return Ok((0, 0));
}
let transform_fn = Arc::new(transform_fn);
let results: Vec<(Option<Triple>, Triple)> = triples
.par_iter()
.map(|triple| {
let result = transform_fn(triple);
(result, triple.clone())
})
.collect();
let mut transformed = 0;
let mut removed = 0;
for (new_triple, old_triple) in results {
match new_triple {
Some(new) if new != old_triple => {
self.triples.remove(&old_triple);
self.triples.insert(new);
transformed += 1;
}
None => {
self.triples.remove(&old_triple);
removed += 1;
}
_ => {} }
}
Ok((transformed, removed))
}
#[cfg(feature = "parallel")]
pub fn par_iter(&self) -> impl ParallelIterator<Item = &Triple> {
self.triples.par_iter()
}
#[cfg(feature = "parallel")]
pub fn par_count_patterns(
&self,
patterns: Vec<(Option<Subject>, Option<Predicate>, Option<Object>)>,
) -> Vec<usize> {
patterns
.par_iter()
.map(|(subject, predicate, object)| {
self.triples
.iter()
.filter(|triple| {
triple.matches_pattern(
subject.as_ref(),
predicate.as_ref(),
object.as_ref(),
)
})
.count()
})
.collect()
}
#[cfg(feature = "parallel")]
pub fn par_unique_terms(&self) -> (BTreeSet<Subject>, BTreeSet<Predicate>, BTreeSet<Object>) {
let terms: Vec<(Subject, Predicate, Object)> = self
.triples
.par_iter()
.map(|triple| {
(
triple.subject().clone(),
triple.predicate().clone(),
triple.object().clone(),
)
})
.collect();
let mut subjects = BTreeSet::new();
let mut predicates = BTreeSet::new();
let mut objects = BTreeSet::new();
for (s, p, o) in terms {
subjects.insert(s);
predicates.insert(p);
objects.insert(o);
}
(subjects, predicates, objects)
}
}
impl Default for Graph {
fn default() -> Self {
Self::new()
}
}
pub struct GraphIter<'a> {
inner: std::collections::btree_set::Iter<'a, Triple>,
}
impl<'a> Iterator for GraphIter<'a> {
type Item = &'a Triple;
fn next(&mut self) -> Option<Self::Item> {
self.inner.next()
}
}
impl<'a> IntoIterator for &'a Graph {
type Item = &'a Triple;
type IntoIter = GraphIter<'a>;
fn into_iter(self) -> Self::IntoIter {
GraphIter {
inner: self.triples.iter(),
}
}
}
impl IntoIterator for Graph {
type Item = Triple;
type IntoIter = std::collections::btree_set::IntoIter<Triple>;
fn into_iter(self) -> Self::IntoIter {
self.triples.into_iter()
}
}
impl FromIterator<Triple> for Graph {
fn from_iter<I: IntoIterator<Item = Triple>>(iter: I) -> Self {
Graph {
triples: iter.into_iter().collect(),
}
}
}
impl Extend<Triple> for Graph {
fn extend<I: IntoIterator<Item = Triple>>(&mut self, iter: I) {
self.triples.extend(iter);
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::time::Instant;
fn create_test_triple(id: usize) -> Triple {
Triple::new(
Subject::NamedNode(
NamedNode::new(format!("http://subject/{id}")).expect("valid IRI from format"),
),
Predicate::NamedNode(
NamedNode::new(format!("http://predicate/{id}")).expect("valid IRI from format"),
),
Object::NamedNode(
NamedNode::new(format!("http://object/{id}")).expect("valid IRI from format"),
),
)
}
fn create_test_triples(count: usize) -> Vec<Triple> {
(0..count).map(create_test_triple).collect()
}
#[test]
fn test_par_insert_batch() {
let mut graph = Graph::new();
let triples = create_test_triples(10000);
let start = Instant::now();
let inserted = graph
.par_insert_batch(triples.clone())
.expect("parallel batch insert should succeed");
let duration = start.elapsed();
println!("Parallel insert of 10000 triples took: {duration:?}");
assert_eq!(inserted, 10000);
assert_eq!(graph.len(), 10000);
for triple in &triples {
assert!(graph.contains_triple(triple));
}
}
#[test]
fn test_par_insert_batch_with_duplicates() {
let mut graph = Graph::new();
let mut triples = create_test_triples(5000);
triples.extend(create_test_triples(2500));
let inserted = graph
.par_insert_batch(triples)
.expect("graph operation should succeed");
assert_eq!(inserted, 5000);
assert_eq!(graph.len(), 5000);
}
#[test]
fn test_par_remove_batch() {
let mut graph = Graph::new();
let triples = create_test_triples(10000);
graph.extend(triples.clone());
let to_remove: Vec<Triple> = triples.iter().step_by(2).cloned().collect();
let start = Instant::now();
let removed = graph
.par_remove_batch(to_remove.clone())
.expect("parallel batch remove should succeed");
let duration = start.elapsed();
println!("Parallel remove of 5000 triples took: {duration:?}");
assert_eq!(removed, 5000);
assert_eq!(graph.len(), 5000);
for (i, triple) in triples.iter().enumerate() {
if i % 2 == 0 {
assert!(!graph.contains_triple(triple));
} else {
assert!(graph.contains_triple(triple));
}
}
}
#[test]
fn test_par_query_batch() {
let mut graph = Graph::new();
let triples = create_test_triples(1000);
graph.extend(triples);
let queries: Vec<_> = (0..100)
.map(|i| {
(
Some(Subject::NamedNode(
NamedNode::new(format!("http://subject/{i}"))
.expect("valid IRI from format"),
)),
None,
None,
)
})
.collect();
let start = Instant::now();
let results = graph
.par_query_batch(queries)
.expect("graph operation should succeed");
let duration = start.elapsed();
println!("Parallel query of 100 patterns took: {duration:?}");
assert_eq!(results.len(), 100);
for (i, result) in results.iter().enumerate() {
if i < 1000 {
assert_eq!(result.len(), 1);
} else {
assert_eq!(result.len(), 0);
}
}
}
#[test]
fn test_par_transform() {
let mut graph = Graph::new();
let triples = create_test_triples(1000);
graph.extend(triples);
let transform_fn = |triple: &Triple| -> Option<Triple> {
if let Subject::NamedNode(node) = triple.subject() {
let uri = node.as_str();
if let Some(id_str) = uri.strip_prefix("http://subject/") {
if let Ok(id) = id_str.parse::<usize>() {
if id % 2 == 0 {
return Some(Triple::new(
triple.subject().clone(),
Predicate::NamedNode(
NamedNode::new("http://predicate/transformed")
.expect("valid IRI"),
),
triple.object().clone(),
));
} else if id % 3 == 0 {
return None;
}
}
}
}
Some(triple.clone())
};
let start = Instant::now();
let (transformed, removed) = graph
.par_transform(transform_fn)
.expect("graph operation should succeed");
let duration = start.elapsed();
println!("Parallel transform took: {duration:?}");
println!("Transformed: {transformed}, Removed: {removed}");
let transformed_predicate = Predicate::NamedNode(
NamedNode::new("http://predicate/transformed").expect("valid IRI"),
);
let transformed_count = graph
.query_triples(None, Some(&transformed_predicate), None)
.len();
assert!(transformed_count > 0);
}
#[test]
fn test_par_count_patterns() {
let mut graph = Graph::new();
for i in 0..100 {
for j in 0..10 {
let triple = Triple::new(
Subject::NamedNode(
NamedNode::new(format!("http://subject/{i}"))
.expect("valid IRI from format"),
),
Predicate::NamedNode(
NamedNode::new(format!("http://predicate/{j}"))
.expect("valid IRI from format"),
),
Object::NamedNode(
NamedNode::new(format!("http://object/{}", i * 10 + j))
.expect("valid IRI from format"),
),
);
graph.add_triple(triple);
}
}
let patterns: Vec<_> = (0..10)
.map(|i| {
(
None,
Some(Predicate::NamedNode(
NamedNode::new(format!("http://predicate/{i}"))
.expect("valid IRI from format"),
)),
None,
)
})
.collect();
let counts = graph.par_count_patterns(patterns);
for count in counts {
assert_eq!(count, 100);
}
}
#[test]
fn test_par_unique_terms() {
let mut graph = Graph::new();
let triples = create_test_triples(1000);
graph.extend(triples);
let start = Instant::now();
let (subjects, predicates, objects) = graph.par_unique_terms();
let duration = start.elapsed();
println!("Parallel unique terms extraction took: {duration:?}");
assert_eq!(subjects.len(), 1000);
assert_eq!(predicates.len(), 1000);
assert_eq!(objects.len(), 1000);
}
#[test]
fn test_par_iter() {
let mut graph = Graph::new();
let triples = create_test_triples(1000);
graph.extend(triples);
let count = graph.par_iter().count();
assert_eq!(count, 1000);
let filtered: Vec<_> = graph
.par_iter()
.filter(|triple| {
if let Subject::NamedNode(node) = triple.subject() {
node.as_str().ends_with("0")
} else {
false
}
})
.cloned()
.collect();
assert_eq!(filtered.len(), 100);
}
#[test]
fn test_parallel_performance_comparison() {
let triple_count = 50000;
let triples = create_test_triples(triple_count);
let mut graph1 = Graph::new();
let start = Instant::now();
for triple in &triples {
graph1.add_triple(triple.clone());
}
let seq_duration = start.elapsed();
let mut graph2 = Graph::new();
let start = Instant::now();
graph2
.par_insert_batch(triples.clone())
.expect("parallel batch insert should succeed");
let par_duration = start.elapsed();
println!("Performance comparison for {triple_count} triples:");
println!(" Sequential insert: {seq_duration:?}");
println!(" Parallel insert: {par_duration:?}");
println!(
" Speedup: {:.2}x",
seq_duration.as_secs_f64() / par_duration.as_secs_f64()
);
assert_eq!(graph1.len(), graph2.len());
}
#[test]
fn test_empty_operations() {
let mut graph = Graph::new();
let inserted = graph
.par_insert_batch(vec![])
.expect("graph operation should succeed");
assert_eq!(inserted, 0);
let removed = graph
.par_remove_batch(vec![])
.expect("graph operation should succeed");
assert_eq!(removed, 0);
let results = graph
.par_query_batch(vec![])
.expect("graph operation should succeed");
assert!(results.is_empty());
let (transformed, removed) = graph
.par_transform(|t| Some(t.clone()))
.expect("parallel transform should succeed");
assert_eq!(transformed, 0);
assert_eq!(removed, 0);
}
}
#[derive(Debug, Clone)]
pub struct ConcurrentGraph {
inner: Arc<parking_lot::RwLock<Graph>>,
}
impl ConcurrentGraph {
pub fn new() -> Self {
Self {
inner: Arc::new(parking_lot::RwLock::new(Graph::new())),
}
}
pub fn from_graph(graph: Graph) -> Self {
Self {
inner: Arc::new(parking_lot::RwLock::new(graph)),
}
}
pub fn add_triple(&self, triple: Triple) -> bool {
self.inner.write().add_triple(triple)
}
pub fn add_triples(&self, triples: Vec<Triple>) -> usize {
let mut graph = self.inner.write();
let mut added = 0;
for triple in triples {
if graph.add_triple(triple) {
added += 1;
}
}
added
}
pub fn remove_triple(&self, triple: &Triple) -> bool {
self.inner.write().remove_triple(triple)
}
pub fn contains_triple(&self, triple: &Triple) -> bool {
self.inner.read().contains_triple(triple)
}
pub fn query_triples(
&self,
subject: Option<&Subject>,
predicate: Option<&Predicate>,
object: Option<&Object>,
) -> Vec<Triple> {
self.inner.read().query_triples(subject, predicate, object)
}
pub fn len(&self) -> usize {
self.inner.read().len()
}
pub fn is_empty(&self) -> bool {
self.inner.read().is_empty()
}
pub fn triples(&self) -> Vec<Triple> {
self.inner.read().triples()
}
pub fn merge(&self, other: &Graph) {
self.inner.write().merge(other)
}
pub fn merge_concurrent(&self, other: &ConcurrentGraph) {
let other_triples = other.triples();
let mut graph = self.inner.write();
for triple in other_triples {
graph.add_triple(triple);
}
}
pub fn union(&self, other: &Graph) -> Graph {
self.inner.read().union(other)
}
pub fn intersection(&self, other: &Graph) -> Graph {
self.inner.read().intersection(other)
}
pub fn clear(&self) {
self.inner.write().clear()
}
pub fn with_read<F, R>(&self, f: F) -> R
where
F: FnOnce(&Graph) -> R,
{
let graph = self.inner.read();
f(&graph)
}
pub fn with_write<F, R>(&self, f: F) -> R
where
F: FnOnce(&mut Graph) -> R,
{
let mut graph = self.inner.write();
f(&mut graph)
}
#[cfg(feature = "parallel")]
pub fn par_insert_batch(&self, triples: Vec<Triple>) -> Result<usize> {
self.inner.write().par_insert_batch(triples)
}
#[cfg(feature = "parallel")]
pub fn par_remove_batch(&self, triples: Vec<Triple>) -> Result<usize> {
self.inner.write().par_remove_batch(triples)
}
#[cfg(feature = "parallel")]
pub fn par_query_batch(
&self,
queries: Vec<(Option<Subject>, Option<Predicate>, Option<Object>)>,
) -> Result<Vec<Vec<Triple>>> {
self.inner.read().par_query_batch(queries)
}
pub fn subjects(&self) -> BTreeSet<Subject> {
self.inner.read().subjects()
}
pub fn predicates(&self) -> BTreeSet<Predicate> {
self.inner.read().predicates()
}
pub fn objects(&self) -> BTreeSet<Object> {
self.inner.read().objects()
}
}
impl Default for ConcurrentGraph {
fn default() -> Self {
Self::new()
}
}
#[allow(dead_code)]
pub struct GraphThreadPool {
#[cfg(feature = "parallel")]
pool: rayon::ThreadPool,
max_batch_size: usize,
}
impl GraphThreadPool {
pub fn new() -> Result<Self> {
#[cfg(feature = "parallel")]
{
let pool = rayon::ThreadPoolBuilder::new()
.num_threads(num_cpus::get())
.thread_name(|index| format!("oxirs-graph-{index}"))
.build()
.map_err(|e| crate::OxirsError::ConcurrencyError(e.to_string()))?;
Ok(Self {
pool,
max_batch_size: 10_000,
})
}
#[cfg(not(feature = "parallel"))]
{
Ok(Self {
max_batch_size: 10_000,
})
}
}
pub fn with_config(num_threads: usize, max_batch_size: usize) -> Result<Self> {
#[cfg(feature = "parallel")]
{
let pool = rayon::ThreadPoolBuilder::new()
.num_threads(num_threads)
.thread_name(|index| format!("oxirs-graph-{index}"))
.build()
.map_err(|e| crate::OxirsError::ConcurrencyError(e.to_string()))?;
Ok(Self {
pool,
max_batch_size,
})
}
#[cfg(not(feature = "parallel"))]
{
Ok(Self { max_batch_size })
}
}
pub fn process_triples<F, R>(&self, triples: Vec<Triple>, processor: F) -> Vec<R>
where
F: Fn(Triple) -> R + Sync + Send,
R: Send,
{
#[cfg(feature = "parallel")]
{
self.pool
.install(|| triples.into_par_iter().map(processor).collect())
}
#[cfg(not(feature = "parallel"))]
{
triples.into_iter().map(processor).collect()
}
}
pub fn process_graphs<F, R>(&self, graphs: Vec<Graph>, processor: F) -> Vec<R>
where
F: Fn(Graph) -> R + Sync + Send,
R: Send,
{
#[cfg(feature = "parallel")]
{
self.pool
.install(|| graphs.into_par_iter().map(processor).collect())
}
#[cfg(not(feature = "parallel"))]
{
graphs.into_iter().map(processor).collect()
}
}
pub fn merge_graphs(&self, graphs: Vec<Graph>) -> Graph {
if graphs.is_empty() {
return Graph::new();
}
#[cfg(feature = "parallel")]
{
self.pool.install(|| {
graphs.into_par_iter().reduce(Graph::new, |mut acc, graph| {
acc.merge(&graph);
acc
})
})
}
#[cfg(not(feature = "parallel"))]
{
graphs.into_iter().fold(Graph::new(), |mut acc, graph| {
acc.merge(&graph);
acc
})
}
}
#[cfg(feature = "parallel")]
pub fn inner(&self) -> &rayon::ThreadPool {
&self.pool
}
}
impl Default for GraphThreadPool {
fn default() -> Self {
Self::new().expect("Failed to create default thread pool")
}
}
#[cfg(test)]
mod concurrent_tests {
use super::*;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::thread;
use std::time::Duration;
#[test]
fn test_concurrent_graph_basic_operations() {
let graph = ConcurrentGraph::new();
let triple = Triple::new(
NamedNode::new("http://example.org/s").expect("valid IRI"),
NamedNode::new("http://example.org/p").expect("valid IRI"),
Literal::new("test"),
);
assert!(graph.add_triple(triple.clone()));
assert!(graph.contains_triple(&triple));
assert_eq!(graph.len(), 1);
assert!(!graph.is_empty());
assert!(graph.remove_triple(&triple));
assert!(!graph.contains_triple(&triple));
assert_eq!(graph.len(), 0);
assert!(graph.is_empty());
}
#[test]
fn test_concurrent_access() {
let graph = ConcurrentGraph::new();
let counter = Arc::new(AtomicUsize::new(0));
let mut handles = vec![];
for i in 0..10 {
let g = graph.clone();
let c = counter.clone();
handles.push(thread::spawn(move || {
for j in 0..100 {
let triple = Triple::new(
NamedNode::new(format!("http://example.org/s{}", i * 100 + j))
.expect("valid IRI from format"),
NamedNode::new("http://example.org/p").expect("valid IRI"),
Literal::new(format!("value{j}")),
);
if g.add_triple(triple) {
c.fetch_add(1, Ordering::Relaxed);
}
thread::sleep(Duration::from_nanos(1));
}
}));
}
for handle in handles {
handle.join().expect("thread should not panic");
}
assert_eq!(counter.load(Ordering::Relaxed), 1000);
assert_eq!(graph.len(), 1000);
}
#[test]
fn test_concurrent_graph_merge() {
let graph1 = ConcurrentGraph::new();
let graph2 = ConcurrentGraph::new();
for i in 0..100 {
let triple1 = Triple::new(
NamedNode::new(format!("http://example.org/s1_{i}"))
.expect("valid IRI from format"),
NamedNode::new("http://example.org/p").expect("valid IRI"),
Literal::new(format!("value{i}")),
);
graph1.add_triple(triple1);
let triple2 = Triple::new(
NamedNode::new(format!("http://example.org/s2_{i}"))
.expect("valid IRI from format"),
NamedNode::new("http://example.org/p").expect("valid IRI"),
Literal::new(format!("value{i}")),
);
graph2.add_triple(triple2);
}
graph1.merge_concurrent(&graph2);
assert_eq!(graph1.len(), 200);
assert_eq!(graph2.len(), 100);
}
#[test]
fn test_graph_thread_pool() {
let pool = GraphThreadPool::new().expect("thread pool creation should succeed");
let triples: Vec<Triple> = (0..1000)
.map(|i| {
Triple::new(
NamedNode::new(format!("http://example.org/s{i}"))
.expect("valid IRI from format"),
NamedNode::new("http://example.org/p").expect("valid IRI"),
Literal::new(format!("value{i}")),
)
})
.collect();
let results = pool.process_triples(triples.clone(), |triple| {
triple.to_string().len()
});
assert_eq!(results.len(), 1000);
assert!(results.iter().all(|&len| len > 0));
}
#[test]
fn test_concurrent_with_operations() {
let graph = ConcurrentGraph::new();
let initial_len = graph.with_read(|g| g.len());
assert_eq!(initial_len, 0);
graph.with_write(|g| {
for i in 0..10 {
let triple = Triple::new(
NamedNode::new(format!("http://example.org/s{i}"))
.expect("valid IRI from format"),
NamedNode::new("http://example.org/p").expect("valid IRI"),
Literal::new(format!("value{i}")),
);
g.add_triple(triple);
}
});
let final_len = graph.with_read(|g| g.len());
assert_eq!(final_len, 10);
}
}