#![allow(
unused_imports,
unused_variables,
dead_code,
clippy::uninlined_format_args,
clippy::bool_assert_comparison,
clippy::print_literal,
clippy::field_reassign_with_default,
clippy::needless_borrows_for_generic_args
)]
use oxirs_arq::{
algebra::{Algebra, Binding, Iri, Literal, Term, TriplePattern, Variable},
executor::{Dataset, ExecutionContext, InMemoryDataset, ParallelConfig, QueryExecutor},
Solution,
};
use oxirs_core::model::NamedNode;
use std::collections::HashMap;
use std::time::Instant;
#[cfg(feature = "parallel")]
#[test]
fn test_parallel_bgp_execution() {
use std::sync::Arc;
let mut dataset = InMemoryDataset::new();
for i in 0..1000 {
dataset.add_triple(
Term::Iri(NamedNode::new_unchecked(&format!(
"http://example.org/person{}",
i
))),
Term::Iri(NamedNode::new_unchecked("http://xmlns.com/foaf/0.1/name")),
Term::Literal(Literal::string(format!("Person {}", i))),
);
dataset.add_triple(
Term::Iri(NamedNode::new_unchecked(&format!(
"http://example.org/person{}",
i
))),
Term::Iri(NamedNode::new_unchecked("http://xmlns.com/foaf/0.1/age")),
Term::Literal(Literal::typed(
(20 + (i % 60)).to_string(),
NamedNode::new_unchecked("http://www.w3.org/2001/XMLSchema#integer"),
)),
);
}
let patterns = vec![
TriplePattern {
subject: Term::Variable(Variable::new("person").unwrap()),
predicate: Term::Iri(NamedNode::new_unchecked("http://xmlns.com/foaf/0.1/name")),
object: Term::Variable(Variable::new("name").unwrap()),
},
TriplePattern {
subject: Term::Variable(Variable::new("person").unwrap()),
predicate: Term::Iri(NamedNode::new_unchecked("http://xmlns.com/foaf/0.1/age")),
object: Term::Variable(Variable::new("age").unwrap()),
},
];
let algebra = Algebra::Bgp(patterns);
let mut parallel_context = ExecutionContext::default();
parallel_context.parallel = true;
parallel_context.parallel_threshold = 100;
let mut executor = QueryExecutor::with_context(parallel_context);
let start = Instant::now();
let (solution, stats) = executor.execute(&algebra, &dataset).unwrap();
let parallel_time = start.elapsed();
println!("Parallel execution time: {:?}", parallel_time);
println!("Results found: {}", solution.len());
println!("Stats: {:?}", stats);
assert_eq!(solution.len(), 1000);
let mut sequential_context = ExecutionContext::default();
sequential_context.parallel = false;
let mut executor = QueryExecutor::with_context(sequential_context);
let start = Instant::now();
let (solution2, _) = executor.execute(&algebra, &dataset).unwrap();
let sequential_time = start.elapsed();
println!("Sequential execution time: {:?}", sequential_time);
assert_eq!(solution.len(), solution2.len());
}
#[cfg(feature = "parallel")]
#[test]
fn test_parallel_join_execution() {
let mut dataset = InMemoryDataset::new();
for i in 0..100 {
dataset.add_triple(
Term::Iri(NamedNode::new_unchecked(&format!(
"http://example.org/person{}",
i
))),
Term::Iri(NamedNode::new_unchecked("http://example.org/parent")),
Term::Iri(NamedNode::new_unchecked(&format!(
"http://example.org/person{}",
i * 2 + 100
))),
);
dataset.add_triple(
Term::Iri(NamedNode::new_unchecked(&format!(
"http://example.org/person{}",
i * 2 + 100
))),
Term::Iri(NamedNode::new_unchecked("http://example.org/name")),
Term::Literal(Literal::string(format!("Child of Person {}", i))),
);
}
let left_pattern = Algebra::Bgp(vec![TriplePattern {
subject: Term::Variable(Variable::new("parent").unwrap()),
predicate: Term::Iri(NamedNode::new_unchecked("http://example.org/parent")),
object: Term::Variable(Variable::new("child").unwrap()),
}]);
let right_pattern = Algebra::Bgp(vec![TriplePattern {
subject: Term::Variable(Variable::new("child").unwrap()),
predicate: Term::Iri(NamedNode::new_unchecked("http://example.org/name")),
object: Term::Variable(Variable::new("childName").unwrap()),
}]);
let join_algebra = Algebra::Join {
left: Box::new(left_pattern),
right: Box::new(right_pattern),
};
let mut context = ExecutionContext::default();
context.parallel = true;
context.parallel_config.max_threads = 4;
let mut executor = QueryExecutor::with_context(context);
let start = Instant::now();
let (solution, stats) = executor.execute(&join_algebra, &dataset).unwrap();
let duration = start.elapsed();
println!("Parallel join execution time: {:?}", duration);
println!("Join results: {}", solution.len());
println!("Stats: {:?}", stats);
assert_eq!(solution.len(), 100);
}
#[cfg(feature = "parallel")]
#[test]
fn test_parallel_aggregation() {
let mut dataset = InMemoryDataset::new();
for i in 0..1000 {
let dept = format!("Dept{}", i % 10);
dataset.add_triple(
Term::Iri(NamedNode::new_unchecked(&format!(
"http://example.org/person{}",
i
))),
Term::Iri(NamedNode::new_unchecked("http://example.org/department")),
Term::Literal(Literal::string(dept)),
);
dataset.add_triple(
Term::Iri(NamedNode::new_unchecked(&format!(
"http://example.org/person{}",
i
))),
Term::Iri(NamedNode::new_unchecked("http://example.org/salary")),
Term::Literal(Literal::typed(
(30000 + (i * 100)).to_string(),
NamedNode::new_unchecked("http://www.w3.org/2001/XMLSchema#integer"),
)),
);
}
let pattern = Algebra::Bgp(vec![
TriplePattern {
subject: Term::Variable(Variable::new("person").unwrap()),
predicate: Term::Iri(NamedNode::new_unchecked("http://example.org/department")),
object: Term::Variable(Variable::new("dept").unwrap()),
},
TriplePattern {
subject: Term::Variable(Variable::new("person").unwrap()),
predicate: Term::Iri(NamedNode::new_unchecked("http://example.org/salary")),
object: Term::Variable(Variable::new("salary").unwrap()),
},
]);
use oxirs_arq::algebra::Expression;
use oxirs_arq::{Aggregate, GroupCondition};
let group_algebra = Algebra::Group {
pattern: Box::new(pattern),
variables: vec![GroupCondition {
expr: Expression::Variable(Variable::new("dept").unwrap()),
alias: None,
}],
aggregates: vec![
(
Variable::new("count").unwrap(),
Aggregate::Count {
expr: Some(Expression::Variable(Variable::new("person").unwrap())),
distinct: false,
},
),
(
Variable::new("totalSalary").unwrap(),
Aggregate::Sum {
expr: Expression::Variable(Variable::new("salary").unwrap()),
distinct: false,
},
),
],
};
let mut context = ExecutionContext::default();
context.parallel = true;
let mut executor = QueryExecutor::with_context(context);
let start = Instant::now();
let (solution, stats) = executor.execute(&group_algebra, &dataset).unwrap();
let duration = start.elapsed();
println!("Parallel aggregation execution time: {:?}", duration);
println!("Group results: {}", solution.len());
println!("Stats: {:?}", stats);
assert_eq!(solution.len(), 10);
for binding in &solution {
if let Some(Term::Literal(count_lit)) = binding.get(&Variable::new("count").unwrap()) {
assert_eq!(count_lit.value, "100");
}
}
}
#[cfg(feature = "parallel")]
#[test]
fn test_parallel_configuration() {
let mut config = ParallelConfig::default();
config.max_threads = 8;
config.chunk_size = 500;
config.work_stealing = true;
let mut context = ExecutionContext::default();
context.parallel = true;
context.parallel_config = config;
let executor = QueryExecutor::with_context(context);
}