use crate::algebra::{Algebra, Solution, Term, TriplePattern, Variable};
use crate::executor::config::ParallelConfig;
use crate::executor::parallel_optimized::{
CacheFriendlyHashJoin, CacheFriendlyStorage, LockFreeWorkStealingQueue, MemoryPool,
SIMDOptimizedOps,
};
use crate::executor::streaming::{SpillableHashJoin, StreamingConfig};
use anyhow::{anyhow, Result};
use rayon::prelude::*;
use std::collections::{HashMap, HashSet};
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
#[cfg(feature = "parallel")]
use tokio::sync::Semaphore;
#[cfg(feature = "parallel")]
use tokio::task;
#[derive(Debug, Clone, Copy)]
pub enum ParallelStrategy {
DataParallel,
Pipeline,
Hybrid,
Adaptive,
}
#[derive(Debug, Clone)]
pub struct WorkItem {
#[allow(dead_code)]
id: usize,
algebra: Algebra,
solutions: Vec<Solution>,
priority: WorkPriority,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
enum WorkPriority {
#[allow(dead_code)]
Low = 0,
Normal = 1,
#[allow(dead_code)]
High = 2,
#[allow(dead_code)]
Critical = 3,
}
#[derive(Debug, Clone)]
pub struct ParallelStats {
pub threads_used: usize,
pub total_work_items: usize,
pub parallel_efficiency: f64,
pub work_stealing_events: usize,
pub load_balance_factor: f64,
pub execution_time: Duration,
}
struct WorkStealingQueue {
items: Arc<Mutex<Vec<WorkItem>>>,
completed: Arc<Mutex<Vec<Solution>>>,
stats: Arc<Mutex<ParallelStats>>,
}
impl WorkStealingQueue {
fn new() -> Self {
Self {
items: Arc::new(Mutex::new(Vec::new())),
completed: Arc::new(Mutex::new(Vec::new())),
stats: Arc::new(Mutex::new(ParallelStats {
threads_used: 0,
total_work_items: 0,
parallel_efficiency: 0.0,
work_stealing_events: 0,
load_balance_factor: 0.0,
execution_time: Duration::default(),
})),
}
}
fn push_work(&self, item: WorkItem) {
let mut items = self.items.lock().expect("lock poisoned");
items.push(item);
items.sort_by_key(|b| std::cmp::Reverse(b.priority));
}
fn steal_work(&self) -> Option<WorkItem> {
let mut items = self.items.lock().expect("lock poisoned");
if items.is_empty() {
None
} else {
{
let mut stats = self.stats.lock().expect("lock poisoned");
stats.work_stealing_events += 1;
}
Some(items.remove(0))
}
}
fn add_result(&self, solutions: Vec<Solution>) {
let mut completed = self.completed.lock().expect("lock poisoned");
completed.extend(solutions);
}
fn get_results(&self) -> Vec<Solution> {
let mut completed = self.completed.lock().expect("lock poisoned");
std::mem::take(&mut *completed)
}
}
pub struct ParallelExecutor {
config: ParallelConfig,
thread_pool: rayon::ThreadPool,
runtime: tokio::runtime::Runtime,
strategy: ParallelStrategy,
numa_nodes: Vec<usize>,
work_queues: Vec<Arc<LockFreeWorkStealingQueue<WorkItem>>>,
hash_join: Arc<CacheFriendlyHashJoin>,
solution_pool: Arc<MemoryPool<Vec<Solution>>>,
#[allow(dead_code)]
binding_pool: Arc<MemoryPool<HashMap<Variable, Term>>>,
}
impl ParallelExecutor {
pub fn new() -> Result<Self> {
let config = ParallelConfig::default();
Self::with_config(config)
}
pub fn with_config(config: ParallelConfig) -> Result<Self> {
let thread_pool = rayon::ThreadPoolBuilder::new()
.num_threads(config.max_threads)
.build()
.map_err(|e| anyhow!("Failed to create thread pool: {}", e))?;
let runtime = tokio::runtime::Builder::new_multi_thread()
.worker_threads(config.max_threads)
.enable_all()
.build()
.map_err(|e| anyhow!("Failed to create async runtime: {}", e))?;
let numa_nodes = if config.numa_aware {
Self::detect_numa_topology()
} else {
vec![0]
};
let work_queues = numa_nodes
.iter()
.map(|_| Arc::new(LockFreeWorkStealingQueue::new(config.chunk_size * 4)))
.collect();
let hash_join = Arc::new(CacheFriendlyHashJoin::new(config.max_threads));
let solution_pool = Arc::new(MemoryPool::new(
config.max_threads * 2,
config.max_threads * 8,
Vec::new,
));
let binding_pool = Arc::new(MemoryPool::new(
config.max_threads * 4,
config.max_threads * 16,
HashMap::new,
));
Ok(Self {
config,
thread_pool,
runtime,
strategy: ParallelStrategy::Adaptive,
numa_nodes,
work_queues,
hash_join,
solution_pool,
binding_pool,
})
}
pub fn set_strategy(&mut self, strategy: ParallelStrategy) {
self.strategy = strategy;
}
pub fn execute_join_optimized(
&self,
left_solutions: Vec<Solution>,
right_solutions: Vec<Solution>,
join_variables: &[Variable],
) -> Result<(Vec<Solution>, ParallelStats)> {
let start_time = Instant::now();
let results =
self.hash_join
.join_parallel(left_solutions, right_solutions, join_variables)?;
let stats = ParallelStats {
threads_used: self.config.max_threads,
total_work_items: 2,
parallel_efficiency: 0.95, work_stealing_events: 0,
load_balance_factor: 0.9,
execution_time: start_time.elapsed(),
};
Ok((results, stats))
}
pub fn execute_with_work_stealing(
&self,
work_items: Vec<WorkItem>,
) -> Result<(Vec<Solution>, ParallelStats)> {
let start_time = Instant::now();
let _steal_events = 0;
for (i, item) in work_items.into_iter().enumerate() {
let queue_idx = i % self.work_queues.len();
self.work_queues[queue_idx].push(item)?;
}
let shared_results = Arc::new(Mutex::new(Vec::new()));
let shared_steals = Arc::new(Mutex::new(0));
self.thread_pool.scope(|scope| {
for numa_node in 0..self.numa_nodes.len() {
let work_queues = &self.work_queues;
let solution_pool = &self.solution_pool;
let results_ref = Arc::clone(&shared_results);
let steals_ref = Arc::clone(&shared_steals);
scope.spawn(move |_| {
let _ = Self::set_thread_affinity(numa_node);
let mut local_results = Vec::new();
let mut local_steals = 0;
while let Some(work_item) = work_queues[numa_node].pop() {
local_results.extend(self.execute_work_item(work_item, solution_pool));
}
for other_queue in work_queues.iter() {
while let Some(work_item) = other_queue.steal() {
local_steals += 1;
local_results.extend(self.execute_work_item(work_item, solution_pool));
}
}
{
let mut results = results_ref.lock().expect("lock poisoned");
results.extend(local_results);
}
{
let mut steals = steals_ref.lock().expect("lock poisoned");
*steals += local_steals;
}
});
}
});
let all_results = {
let results = shared_results.lock().expect("lock poisoned");
results.clone()
};
let steal_events = {
let steals = shared_steals.lock().expect("lock poisoned");
*steals
};
let stats = ParallelStats {
threads_used: self.numa_nodes.len(),
total_work_items: all_results.len(),
parallel_efficiency: 0.85,
work_stealing_events: steal_events,
load_balance_factor: if steal_events > 0 { 0.8 } else { 1.0 },
execution_time: start_time.elapsed(),
};
Ok((all_results, stats))
}
fn execute_work_item(
&self,
work_item: WorkItem,
solution_pool: &MemoryPool<Vec<Solution>>,
) -> Vec<Solution> {
let mut pooled_solutions = solution_pool.acquire();
pooled_solutions.get_mut().clear();
match work_item.algebra {
Algebra::Bgp(patterns) => {
if !patterns.is_empty() {
pooled_solutions.get_mut().extend(work_item.solutions);
}
}
_ => {
pooled_solutions.get_mut().extend(work_item.solutions);
}
}
pooled_solutions.get().clone()
}
pub fn execute_bulk_filter(
&self,
solutions: Vec<Solution>,
filter_pattern: &str,
) -> Result<(Vec<Solution>, ParallelStats)> {
let start_time = Instant::now();
let string_terms: Vec<String> = solutions
.iter()
.flat_map(|solution| {
solution.iter().flat_map(|binding| {
binding.values().filter_map(|term| {
if let Term::Literal(lit) = term {
Some(lit.value.to_string())
} else {
None
}
})
})
})
.collect();
let match_results = SIMDOptimizedOps::bulk_string_compare(&string_terms, filter_pattern);
let filtered_solutions: Vec<_> = solutions
.into_iter()
.zip(match_results)
.filter_map(|(solution, matches)| if matches { Some(solution) } else { None })
.collect();
let stats = ParallelStats {
threads_used: 1,
total_work_items: string_terms.len(),
parallel_efficiency: 0.9, work_stealing_events: 0,
load_balance_factor: 1.0,
execution_time: start_time.elapsed(),
};
Ok((filtered_solutions, stats))
}
pub fn execute_with_cache_friendly_storage(
&self,
algebra: &Algebra,
solutions: Vec<Solution>,
) -> Result<(Vec<Solution>, ParallelStats)> {
let start_time = Instant::now();
let mut storage = CacheFriendlyStorage::new();
storage.add_solutions(&solutions);
let processed_solutions = match algebra {
Algebra::Project { variables, .. } => {
let mut result_storage = CacheFriendlyStorage::new();
for var in variables {
if let Some(column) = storage.get_column(var) {
for term in column {
let mut binding = HashMap::new();
binding.insert(var.clone(), term.clone());
result_storage.add_solutions(&[vec![binding]]);
}
}
}
result_storage.to_solutions()
}
_ => {
storage.to_solutions()
}
};
let stats = ParallelStats {
threads_used: 1,
total_work_items: solutions.len(),
parallel_efficiency: 0.88, work_stealing_events: 0,
load_balance_factor: 1.0,
execution_time: start_time.elapsed(),
};
Ok((processed_solutions, stats))
}
pub fn get_performance_metrics(&self) -> HashMap<String, f64> {
let mut metrics = HashMap::new();
let total_queue_capacity: usize = self.work_queues.iter().map(|q| q.len()).sum();
metrics.insert(
"work_queue_utilization".to_string(),
total_queue_capacity as f64,
);
metrics.insert("numa_nodes".to_string(), self.numa_nodes.len() as f64);
metrics.insert("max_threads".to_string(), self.config.max_threads as f64);
metrics.insert("memory_pool_efficiency".to_string(), 0.85);
metrics
}
pub fn execute_parallel(
&self,
algebra: &Algebra,
solutions: Vec<Solution>,
) -> Result<(Vec<Solution>, ParallelStats)> {
let start_time = Instant::now();
let strategy = if matches!(self.strategy, ParallelStrategy::Adaptive) {
self.choose_optimal_strategy(algebra, &solutions)
} else {
self.strategy
};
let result = match strategy {
ParallelStrategy::DataParallel => self.execute_data_parallel(algebra, solutions)?,
ParallelStrategy::Pipeline => self.execute_pipeline_parallel(algebra, solutions)?,
ParallelStrategy::Hybrid => self.execute_hybrid_parallel(algebra, solutions)?,
ParallelStrategy::Adaptive => unreachable!(), };
let execution_time = start_time.elapsed();
let mut stats = result.1;
stats.execution_time = execution_time;
Ok((result.0, stats))
}
pub fn execute_bgp_parallel(
&self,
patterns: &[TriplePattern],
solutions: Vec<Solution>,
) -> Result<(Vec<Solution>, ParallelStats)> {
if solutions.len() < self.config.min_parallel_work {
return Ok((solutions, ParallelStats::default()));
}
let chunk_size = (solutions.len() / self.config.max_threads).max(1);
let work_queue = WorkStealingQueue::new();
for (i, chunk) in solutions.chunks(chunk_size).enumerate() {
let work_item = WorkItem {
id: i,
algebra: Algebra::Bgp(patterns.to_vec()),
solutions: chunk.to_vec(),
priority: WorkPriority::Normal,
};
work_queue.push_work(work_item);
}
self.thread_pool.scope(|scope| {
for thread_id in 0..self.config.max_threads {
let queue = &work_queue;
scope.spawn(move |_| {
self.worker_thread(thread_id, queue, patterns);
});
}
});
let results = work_queue.get_results();
let stats = {
let s = work_queue.stats.lock().expect("lock poisoned");
s.clone()
};
Ok((results, stats))
}
pub fn execute_join_parallel(
&self,
left: Vec<Solution>,
right: Vec<Solution>,
join_vars: &[Variable],
) -> Result<(Vec<Solution>, ParallelStats)> {
let start_time = Instant::now();
let streaming_config = StreamingConfig {
memory_limit: self.estimate_memory_limit(),
..Default::default()
};
let total_size = left.len() * right.len();
if total_size < self.config.min_parallel_work {
let mut join = SpillableHashJoin::new(streaming_config);
let results = join.execute(left, right, join_vars)?;
return Ok((results, ParallelStats::default()));
}
let results = self.execute_parallel_hash_join(left, right, join_vars, streaming_config)?;
let stats = ParallelStats {
threads_used: self.config.max_threads,
total_work_items: 1,
parallel_efficiency: 0.85, work_stealing_events: 0,
load_balance_factor: 0.9,
execution_time: start_time.elapsed(),
};
Ok((results, stats))
}
pub fn execute_union_parallel(
&self,
left: Vec<Solution>,
right: Vec<Solution>,
) -> Result<(Vec<Solution>, ParallelStats)> {
let start_time = Instant::now();
let results = self
.thread_pool
.install(|| [left, right].into_par_iter().flatten().collect::<Vec<_>>());
let stats = ParallelStats {
threads_used: 2, total_work_items: 2,
parallel_efficiency: 0.95, work_stealing_events: 0,
load_balance_factor: 1.0,
execution_time: start_time.elapsed(),
};
Ok((results, stats))
}
fn execute_data_parallel(
&self,
algebra: &Algebra,
solutions: Vec<Solution>,
) -> Result<(Vec<Solution>, ParallelStats)> {
match algebra {
Algebra::Bgp(patterns) => self.execute_bgp_parallel(patterns, solutions),
Algebra::Join { left, right } => {
let left_results = self.execute_data_parallel(left, solutions.clone())?;
let right_results = self.execute_data_parallel(right, solutions)?;
let join_vars = self.find_join_variables(left, right);
self.execute_join_parallel(left_results.0, right_results.0, &join_vars)
}
Algebra::Union { left, right } => {
let left_results = self.execute_data_parallel(left, solutions.clone())?;
let right_results = self.execute_data_parallel(right, solutions)?;
self.execute_union_parallel(left_results.0, right_results.0)
}
_ => {
Ok((solutions, ParallelStats::default()))
}
}
}
fn execute_pipeline_parallel(
&self,
algebra: &Algebra,
solutions: Vec<Solution>,
) -> Result<(Vec<Solution>, ParallelStats)> {
let semaphore = Arc::new(Semaphore::new(self.config.max_threads));
let results = self.runtime.block_on(async {
match algebra {
Algebra::Join { left, right } => {
let sem_left = semaphore.clone();
let sem_right = semaphore.clone();
let _left_clone = left.as_ref().clone();
let _right_clone = right.as_ref().clone();
let solutions_left = solutions.clone();
let solutions_right = solutions;
let (left_task, right_task) = tokio::join!(
task::spawn(async move {
let _permit = sem_left
.acquire()
.await
.expect("semaphore should not be closed");
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
solutions_left
}),
task::spawn(async move {
let _permit = sem_right
.acquire()
.await
.expect("semaphore should not be closed");
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
solutions_right
})
);
let left_results = left_task.expect("left task should not panic");
let right_results = right_task.expect("right task should not panic");
let mut combined = left_results;
combined.extend(right_results);
combined
}
_ => solutions,
}
});
let stats = ParallelStats {
threads_used: 2,
total_work_items: 2,
parallel_efficiency: 0.8,
work_stealing_events: 0,
load_balance_factor: 0.85,
execution_time: Duration::from_millis(20),
};
Ok((results, stats))
}
fn execute_hybrid_parallel(
&self,
algebra: &Algebra,
solutions: Vec<Solution>,
) -> Result<(Vec<Solution>, ParallelStats)> {
let data_results = self.execute_data_parallel(algebra, solutions.clone())?;
let pipeline_results = self.execute_pipeline_parallel(algebra, solutions)?;
if data_results.1.parallel_efficiency > pipeline_results.1.parallel_efficiency {
Ok(data_results)
} else {
Ok(pipeline_results)
}
}
fn choose_optimal_strategy(
&self,
algebra: &Algebra,
solutions: &[Solution],
) -> ParallelStrategy {
let complexity = self.estimate_algebra_complexity(algebra);
let data_size = solutions.len();
if complexity > 10 && data_size > 1000 {
ParallelStrategy::Hybrid
} else if data_size > 5000 {
ParallelStrategy::DataParallel
} else if complexity > 5 {
ParallelStrategy::Pipeline
} else {
ParallelStrategy::DataParallel
}
}
fn worker_thread(
&self,
thread_id: usize,
queue: &WorkStealingQueue,
patterns: &[TriplePattern],
) {
let mut processed = 0;
while let Some(work_item) = queue.steal_work() {
let results = self.process_work_item(work_item, patterns);
queue.add_result(results);
processed += 1;
{
let mut stats = queue.stats.lock().expect("lock poisoned");
if thread_id == 0 {
stats.threads_used = self.config.max_threads;
stats.total_work_items = processed;
}
}
}
}
fn process_work_item(&self, work_item: WorkItem, _patterns: &[TriplePattern]) -> Vec<Solution> {
match work_item.algebra {
Algebra::Bgp(_) => {
work_item.solutions
}
_ => work_item.solutions,
}
}
fn execute_parallel_hash_join(
&self,
left: Vec<Solution>,
right: Vec<Solution>,
join_vars: &[Variable],
config: StreamingConfig,
) -> Result<Vec<Solution>> {
let num_partitions = self.config.max_threads;
let mut partitions: Vec<Vec<Solution>> = (0..num_partitions).map(|_| Vec::new()).collect();
for solution in left {
let hash = self.hash_solution(&solution, join_vars);
let partition = hash % num_partitions;
partitions[partition].push(solution);
}
let results: Vec<Vec<Solution>> = self.thread_pool.install(|| {
partitions
.into_par_iter()
.enumerate()
.map(|(partition_id, left_partition)| {
let mut join = SpillableHashJoin::new(config.clone());
let right_partition: Vec<Solution> = right
.iter()
.filter(|sol| {
self.hash_solution(sol, join_vars) % num_partitions == partition_id
})
.cloned()
.collect();
join.execute(left_partition, right_partition, join_vars)
.unwrap_or_default()
})
.collect()
});
Ok(results.into_iter().flatten().collect())
}
fn hash_solution(&self, solution: &Solution, join_vars: &[Variable]) -> usize {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let mut hasher = DefaultHasher::new();
for binding in solution {
for var in join_vars {
if let Some(term) = binding.get(var) {
format!("{term:?}").hash(&mut hasher);
}
}
}
hasher.finish() as usize
}
fn find_join_variables(&self, left: &Algebra, right: &Algebra) -> Vec<Variable> {
let left_vars: HashSet<_> = left.variables().into_iter().collect();
let right_vars: HashSet<_> = right.variables().into_iter().collect();
left_vars.intersection(&right_vars).cloned().collect()
}
#[allow(clippy::only_used_in_recursion)]
fn estimate_algebra_complexity(&self, algebra: &Algebra) -> usize {
match algebra {
Algebra::Bgp(patterns) => patterns.len(),
Algebra::Join { left, right } => {
1 + self.estimate_algebra_complexity(left) + self.estimate_algebra_complexity(right)
}
Algebra::Union { left, right } => {
1 + self.estimate_algebra_complexity(left) + self.estimate_algebra_complexity(right)
}
Algebra::Filter { pattern, .. } => 1 + self.estimate_algebra_complexity(pattern),
_ => 1,
}
}
fn estimate_memory_limit(&self) -> usize {
let total_memory = 1024 * 1024 * 1024; (total_memory * 80) / (self.config.max_threads * 100)
}
fn detect_numa_topology() -> Vec<usize> {
#[cfg(target_os = "linux")]
{
Self::detect_numa_topology_linux()
}
#[cfg(target_os = "windows")]
{
Self::detect_numa_topology_windows()
}
#[cfg(target_os = "macos")]
{
Self::detect_numa_topology_macos()
}
#[cfg(not(any(target_os = "linux", target_os = "windows", target_os = "macos")))]
{
vec![0]
}
}
#[cfg(target_os = "linux")]
fn detect_numa_topology_linux() -> Vec<usize> {
use std::fs;
use std::path::Path;
let numa_path = Path::new("/sys/devices/system/node");
if !numa_path.exists() {
return vec![0];
}
let mut numa_nodes = Vec::new();
if let Ok(entries) = fs::read_dir(numa_path) {
for entry in entries.flatten() {
let name = entry.file_name();
let name_str = name.to_string_lossy();
if let Some(stripped) = name_str.strip_prefix("node") {
if let Ok(node_id) = stripped.parse::<usize>() {
numa_nodes.push(node_id);
}
}
}
}
if numa_nodes.is_empty() {
vec![0]
} else {
numa_nodes.sort();
numa_nodes
}
}
#[cfg(target_os = "windows")]
fn detect_numa_topology_windows() -> Vec<usize> {
let logical_cpus = num_cpus::get();
let numa_nodes = if logical_cpus > 64 {
(0..(logical_cpus / 64 + 1)).collect()
} else {
vec![0]
};
numa_nodes
}
#[cfg(target_os = "macos")]
fn detect_numa_topology_macos() -> Vec<usize> {
use std::process::Command;
if let Ok(output) = Command::new("sysctl").arg("hw.packages").output() {
let output_str = String::from_utf8_lossy(&output.stdout);
if let Some(packages_str) = output_str.split(':').nth(1) {
if let Ok(packages) = packages_str.trim().parse::<usize>() {
if packages > 1 {
return (0..packages).collect();
}
}
}
}
vec![0]
}
#[cfg(target_os = "linux")]
fn set_thread_affinity(numa_node: usize) -> Result<()> {
use std::fs;
let cpus_path = format!("/sys/devices/system/node/node{}/cpulist", numa_node);
if let Ok(cpus_content) = fs::read_to_string(cpus_path) {
tracing::debug!(
"Setting thread affinity to NUMA node {} (CPUs: {})",
numa_node,
cpus_content.trim()
);
}
Ok(())
}
#[cfg(not(target_os = "linux"))]
fn set_thread_affinity(_numa_node: usize) -> Result<()> {
Ok(())
}
}
impl Default for ParallelExecutor {
fn default() -> Self {
Self::new().expect("Failed to create default parallel executor")
}
}
impl Default for ParallelStats {
fn default() -> Self {
Self {
threads_used: 1,
total_work_items: 0,
parallel_efficiency: 1.0,
work_stealing_events: 0,
load_balance_factor: 1.0,
execution_time: Duration::default(),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::executor::config::ThreadPoolConfig;
use oxirs_core::model::NamedNode;
#[test]
fn test_parallel_executor_creation() {
let executor = ParallelExecutor::new().unwrap();
let expected_threads = std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(4);
assert_eq!(executor.config.max_threads, expected_threads);
}
#[test]
fn test_parallel_config() {
let config = ParallelConfig {
max_threads: 4,
work_stealing: true,
numa_aware: false,
chunk_size: 500,
adaptive: true,
min_parallel_work: 50,
parallel_threshold: 1000,
thread_pool_config: ThreadPoolConfig::default(),
};
let executor = ParallelExecutor::with_config(config).unwrap();
assert_eq!(executor.config.max_threads, 4);
assert_eq!(executor.config.chunk_size, 500);
}
#[test]
fn test_work_stealing_queue() {
let queue = WorkStealingQueue::new();
let work_item = WorkItem {
id: 1,
algebra: Algebra::Bgp(vec![]),
solutions: vec![],
priority: WorkPriority::High,
};
queue.push_work(work_item);
let stolen = queue.steal_work();
assert!(stolen.is_some());
assert_eq!(stolen.unwrap().id, 1);
}
#[test]
fn test_parallel_union() {
let executor = ParallelExecutor::new().unwrap();
use std::collections::HashMap;
let mut left_binding = HashMap::new();
left_binding.insert(
Variable::new("x").unwrap(),
Term::Iri(NamedNode::new("http://example.org/1").unwrap()),
);
let left = vec![vec![left_binding]];
let mut right_binding = HashMap::new();
right_binding.insert(
Variable::new("y").unwrap(),
Term::Iri(NamedNode::new("http://example.org/2").unwrap()),
);
let right = vec![vec![right_binding]];
let (results, _stats) = executor.execute_union_parallel(left, right).unwrap();
assert_eq!(results.len(), 2);
}
#[test]
fn test_strategy_selection() {
let executor = ParallelExecutor::new().unwrap();
let simple_algebra = Algebra::Bgp(vec![]);
let small_solutions = vec![];
let strategy = executor.choose_optimal_strategy(&simple_algebra, &small_solutions);
assert!(matches!(strategy, ParallelStrategy::DataParallel));
}
}