use crate::algebra::{Algebra, Binding, Solution, Term, TriplePattern, Variable};
use anyhow::Result;
use scirs2_core::error::CoreError;
use scirs2_core::memory::BufferPool;
use scirs2_core::memory_efficient::ChunkedArray;
use scirs2_core::metrics::{Counter, Histogram, Timer};
use scirs2_core::profiling::Profiler;
use scirs2_core::simd::SimdArray;
use scirs2_core::ndarray_ext::{Array1, Array2, ArrayView1, ArrayView2, Axis};
use scirs2_core::parallel_ops::{ParallelIterator, IntoParallelIterator};
use scirs2_core::random::{Rng, Random, seeded_rng, ThreadLocalRngPool, ScientificSliceRandom};
use scirs2_core::array; use scirs2_core::simd::SimdOps;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::time::Instant;
#[derive(Debug, Clone)]
pub struct VectorizedConfig {
pub batch_size: usize,
pub num_threads: usize,
pub memory_limit: usize,
pub adaptive_chunking: bool,
pub simd_level: SimdLevel,
pub prefetch_strategy: PrefetchStrategy,
}
impl Default for VectorizedConfig {
fn default() -> Self {
Self {
batch_size: 4096,
num_threads: num_cpus::get(),
memory_limit: 1 << 30, adaptive_chunking: true,
simd_level: SimdLevel::Aggressive,
prefetch_strategy: PrefetchStrategy::Adaptive,
}
}
}
#[derive(Debug, Clone, Copy)]
pub enum SimdLevel {
None,
Basic,
Advanced,
Aggressive,
}
#[derive(Debug, Clone, Copy)]
pub enum PrefetchStrategy {
None,
Sequential,
Adaptive,
Aggressive,
}
#[derive(Debug, Clone)]
pub struct VectorizedStats {
pub vectorized_ops: u64,
pub simd_lanes_used: u64,
pub memory_bandwidth: f64,
pub simd_efficiency: f64,
pub cache_hit_ratio: f64,
pub avg_batch_time: std::time::Duration,
}
#[derive(Debug)]
pub struct ColumnarData {
pub subjects: SimdArray<u64>,
pub predicates: SimdArray<u64>,
pub objects: SimdArray<u64>,
pub dictionary: Arc<HashMap<u64, Term>>,
pub reverse_dict: Arc<HashMap<Term, u64>>,
pub row_count: usize,
}
impl ColumnarData {
pub fn new(capacity: usize) -> Self {
Self {
subjects: SimdArray::zeros(capacity),
predicates: SimdArray::zeros(capacity),
objects: SimdArray::zeros(capacity),
dictionary: Arc::new(HashMap::new()),
reverse_dict: Arc::new(HashMap::new()),
row_count: 0,
}
}
pub fn add_triple(&mut self, subject: Term, predicate: Term, object: Term) -> Result<()> {
if self.row_count >= self.subjects.len() {
return Err(anyhow::anyhow!("Columnar data capacity exceeded"));
}
let subj_id = self.get_or_create_id(&subject);
let pred_id = self.get_or_create_id(&predicate);
let obj_id = self.get_or_create_id(&object);
self.subjects[self.row_count] = subj_id;
self.predicates[self.row_count] = pred_id;
self.objects[self.row_count] = obj_id;
self.row_count += 1;
Ok(())
}
fn get_or_create_id(&mut self, term: &Term) -> u64 {
if let Some(&id) = self.reverse_dict.get(term) {
return id;
}
let id = self.dictionary.len() as u64;
Arc::get_mut(&mut self.dictionary).expect("Arc should have single owner during mutation").insert(id, term.clone());
Arc::get_mut(&mut self.reverse_dict).expect("Arc should have single owner during mutation").insert(term.clone(), id);
id
}
pub fn vectorized_filter(&self, predicate_id: u64) -> Result<SimdArray<bool>> {
let mut mask = SimdArray::zeros(self.row_count);
auto_vectorize(&self.predicates.view(), |chunk| {
chunk.iter().enumerate().for_each(|(i, &pred)| {
mask[i] = pred == predicate_id;
});
})?;
Ok(mask)
}
pub fn vectorized_join(&self, other: &ColumnarData, join_column: JoinColumn) -> Result<ColumnarData> {
let mut result = ColumnarData::new(self.row_count.max(other.row_count));
let (left_col, right_col) = match join_column {
JoinColumn::Subject => (&self.subjects, &other.subjects),
JoinColumn::Predicate => (&self.predicates, &other.predicates),
JoinColumn::Object => (&self.objects, &other.objects),
};
for i in 0..self.row_count {
let left_val = left_col[i];
let matches = auto_vectorize(&right_col.view(), |chunk| -> Vec<usize> {
chunk.iter().enumerate()
.filter_map(|(j, &val)| if val == left_val { Some(j) } else { None })
.collect()
})?;
for &j in &matches {
result.add_triple(
self.dictionary[&self.subjects[i]].clone(),
self.dictionary[&self.predicates[i]].clone(),
other.dictionary[&other.objects[j]].clone(),
)?;
}
}
Ok(result)
}
}
#[derive(Debug, Clone, Copy)]
pub enum JoinColumn {
Subject,
Predicate,
Object,
}
pub struct VectorizedExecutor {
config: VectorizedConfig,
buffer_pool: BufferPool<u8>,
profiler: Profiler,
stats: Arc<Mutex<VectorizedStats>>,
vectorized_ops_counter: Counter,
simd_efficiency_histogram: Histogram,
batch_processing_timer: Timer,
memory_bandwidth_gauge: Counter,
}
impl VectorizedExecutor {
pub fn new(config: VectorizedConfig) -> Result<Self> {
let buffer_pool = BufferPool::new(config.memory_limit / 4)?;
let profiler = Profiler::new();
let stats = Arc::new(Mutex::new(VectorizedStats {
vectorized_ops: 0,
simd_lanes_used: 0,
memory_bandwidth: 0.0,
simd_efficiency: 0.0,
cache_hit_ratio: 0.0,
avg_batch_time: std::time::Duration::from_millis(0),
}));
Ok(Self {
config,
buffer_pool,
profiler,
stats,
vectorized_ops_counter: Counter::new("vectorized_ops"),
simd_efficiency_histogram: Histogram::new("simd_efficiency"),
batch_processing_timer: Timer::new("batch_processing"),
memory_bandwidth_gauge: Counter::new("memory_bandwidth"),
})
}
pub fn execute_vectorized(&mut self, algebra: &Algebra, data: &ColumnarData) -> Result<Solution> {
self.profiler.start("vectorized_execution");
let start_time = Instant::now();
let result = match algebra {
Algebra::Bgp(patterns) => self.execute_vectorized_bgp(patterns, data)?,
Algebra::Join { left, right } => self.execute_vectorized_join(left, right, data)?,
Algebra::Filter { pattern, condition } => {
self.execute_vectorized_filter(pattern, condition, data)?
}
Algebra::Union { left, right } => self.execute_vectorized_union(left, right, data)?,
_ => {
self.execute_scalar_fallback(algebra, data)?
}
};
let execution_time = start_time.elapsed();
self.batch_processing_timer.record(execution_time);
self.profiler.stop("vectorized_execution");
self.update_statistics(execution_time, result.len());
Ok(result)
}
fn execute_vectorized_bgp(&mut self, patterns: &[TriplePattern], data: &ColumnarData) -> Result<Solution> {
if patterns.is_empty() {
return Ok(Solution::new());
}
self.vectorized_ops_counter.increment();
let mut current_data = data;
let mut intermediate_results = Vec::new();
for pattern in patterns {
let filtered_data = self.apply_vectorized_pattern_filter(pattern, current_data)?;
intermediate_results.push(filtered_data);
}
let final_data = self.vectorized_multi_join(&intermediate_results)?;
self.columnar_to_solution(&final_data)
}
fn apply_vectorized_pattern_filter(&self, pattern: &TriplePattern, data: &ColumnarData) -> Result<ColumnarData> {
let mut filtered = ColumnarData::new(data.row_count);
let subject_mask = if let Term::Variable(_) = &pattern.subject {
SimdArray::ones(data.row_count) } else {
let target_id = data.reverse_dict.get(&pattern.subject)
.ok_or_else(|| anyhow::anyhow!("Subject not found in dictionary"))?;
self.create_equality_mask(&data.subjects, *target_id)?
};
let predicate_mask = if let Term::Variable(_) = &pattern.predicate {
SimdArray::ones(data.row_count)
} else {
let target_id = data.reverse_dict.get(&pattern.predicate)
.ok_or_else(|| anyhow::anyhow!("Predicate not found in dictionary"))?;
self.create_equality_mask(&data.predicates, *target_id)?
};
let object_mask = if let Term::Variable(_) = &pattern.object {
SimdArray::ones(data.row_count)
} else {
let target_id = data.reverse_dict.get(&pattern.object)
.ok_or_else(|| anyhow::anyhow!("Object not found in dictionary"))?;
self.create_equality_mask(&data.objects, *target_id)?
};
let combined_mask = self.simd_and_masks(&[&subject_mask, &predicate_mask, &object_mask])?;
self.apply_simd_mask(&combined_mask, data, &mut filtered)?;
Ok(filtered)
}
fn create_equality_mask(&self, column: &SimdArray<u64>, target: u64) -> Result<SimdArray<bool>> {
let mut mask = SimdArray::zeros(column.len());
auto_vectorize(&column.view(), |chunk| {
chunk.iter().enumerate().for_each(|(i, &val)| {
mask[i] = val == target;
});
})?;
Ok(mask)
}
fn simd_and_masks(&self, masks: &[&SimdArray<bool>]) -> Result<SimdArray<bool>> {
if masks.is_empty() {
return Err(anyhow::anyhow!("No masks provided"));
}
let mut result = masks[0].clone();
for &mask in masks.iter().skip(1) {
auto_vectorize(&result.view(), |chunk| {
chunk.iter().zip(mask.iter()).enumerate().for_each(|(i, (&a, &b))| {
result[i] = a && b;
});
})?;
}
Ok(result)
}
fn apply_simd_mask(&self, mask: &SimdArray<bool>, source: &ColumnarData, target: &mut ColumnarData) -> Result<()> {
for i in 0..source.row_count {
if mask[i] {
target.add_triple(
source.dictionary[&source.subjects[i]].clone(),
source.dictionary[&source.predicates[i]].clone(),
source.dictionary[&source.objects[i]].clone(),
)?;
}
}
Ok(())
}
fn execute_vectorized_join(&mut self, left: &Algebra, right: &Algebra, data: &ColumnarData) -> Result<Solution> {
let left_data = self.extract_bgp_data(left, data)?;
let right_data = self.extract_bgp_data(right, data)?;
let joined_data = self.vectorized_hash_join(&left_data, &right_data)?;
self.columnar_to_solution(&joined_data)
}
fn vectorized_hash_join(&self, left: &ColumnarData, right: &ColumnarData) -> Result<ColumnarData> {
let mut result = ColumnarData::new(left.row_count * right.row_count);
let hash_table = self.build_vectorized_hash_table(left)?;
for i in 0..right.row_count {
let probe_key = self.compute_join_key(right, i)?;
if let Some(matching_indices) = hash_table.get(&probe_key) {
for &left_idx in matching_indices {
result.add_triple(
left.dictionary[&left.subjects[left_idx]].clone(),
left.dictionary[&left.predicates[left_idx]].clone(),
right.dictionary[&right.objects[i]].clone(),
)?;
}
}
}
Ok(result)
}
fn build_vectorized_hash_table(&self, data: &ColumnarData) -> Result<HashMap<u64, Vec<usize>>> {
let mut hash_table = HashMap::new();
for i in 0..data.row_count {
let key = self.compute_join_key(data, i)?;
hash_table.entry(key).or_insert_with(Vec::new).push(i);
}
Ok(hash_table)
}
fn compute_join_key(&self, data: &ColumnarData, row: usize) -> Result<u64> {
Ok(data.subjects[row])
}
fn execute_vectorized_filter(&mut self, pattern: &Algebra, _condition: &crate::algebra::Expression, data: &ColumnarData) -> Result<Solution> {
let pattern_result = self.execute_vectorized(pattern, data)?;
Ok(pattern_result)
}
fn execute_vectorized_union(&mut self, left: &Algebra, right: &Algebra, data: &ColumnarData) -> Result<Solution> {
let left_result = self.execute_vectorized(left, data)?;
let right_result = self.execute_vectorized(right, data)?;
let mut union_result = left_result;
union_result.extend(right_result);
Ok(union_result)
}
fn vectorized_multi_join(&self, data_sets: &[ColumnarData]) -> Result<ColumnarData> {
if data_sets.is_empty() {
return Ok(ColumnarData::new(0));
}
let mut result = data_sets[0].clone();
for data_set in data_sets.iter().skip(1) {
result = self.vectorized_hash_join(&result, data_set)?;
}
Ok(result)
}
fn columnar_to_solution(&self, data: &ColumnarData) -> Result<Solution> {
let mut solution = Solution::new();
for i in 0..data.row_count {
let mut binding = Binding::new();
let subject_var = Variable::new("s");
let predicate_var = Variable::new("p");
let object_var = Variable::new("o");
binding.insert(subject_var, data.dictionary[&data.subjects[i]].clone());
binding.insert(predicate_var, data.dictionary[&data.predicates[i]].clone());
binding.insert(object_var, data.dictionary[&data.objects[i]].clone());
solution.push(binding);
}
Ok(solution)
}
fn extract_bgp_data(&self, algebra: &Algebra, data: &ColumnarData) -> Result<ColumnarData> {
match algebra {
Algebra::Bgp(patterns) => self.apply_vectorized_pattern_filter(&patterns[0], data),
_ => Ok(data.clone()),
}
}
fn execute_scalar_fallback(&self, _algebra: &Algebra, _data: &ColumnarData) -> Result<Solution> {
Ok(Solution::new())
}
fn update_statistics(&self, execution_time: std::time::Duration, result_count: usize) {
if let Ok(mut stats) = self.stats.lock() {
stats.vectorized_ops += 1;
stats.avg_batch_time = execution_time;
let efficiency = (result_count as f64 / self.config.batch_size as f64) * 100.0;
stats.simd_efficiency = efficiency.min(100.0);
self.simd_efficiency_histogram.record(efficiency);
let bytes_processed = result_count * 24; let bandwidth = bytes_processed as f64 / execution_time.as_secs_f64();
stats.memory_bandwidth = bandwidth;
self.memory_bandwidth_gauge.increment_by(bandwidth as u64);
}
}
pub fn get_stats(&self) -> VectorizedStats {
self.stats.lock().expect("lock should not be poisoned").clone()
}
pub fn process_large_dataset(&mut self, data: &ColumnarData, algebra: &Algebra) -> Result<Solution> {
if !self.config.adaptive_chunking || data.row_count <= self.config.batch_size {
return self.execute_vectorized(algebra, data);
}
let chunking = AdaptiveChunking::new()
.with_memory_limit(self.config.memory_limit)
.with_target_chunk_size(self.config.batch_size)
.build()?;
let mut final_result = Solution::new();
for chunk_start in (0..data.row_count).step_by(self.config.batch_size) {
let chunk_end = (chunk_start + self.config.batch_size).min(data.row_count);
let chunk_data = self.extract_chunk(data, chunk_start, chunk_end)?;
let chunk_result = self.execute_vectorized(algebra, &chunk_data)?;
final_result.extend(chunk_result);
}
Ok(final_result)
}
fn extract_chunk(&self, data: &ColumnarData, start: usize, end: usize) -> Result<ColumnarData> {
let chunk_size = end - start;
let mut chunk = ColumnarData::new(chunk_size);
for i in start..end {
chunk.add_triple(
data.dictionary[&data.subjects[i]].clone(),
data.dictionary[&data.predicates[i]].clone(),
data.dictionary[&data.objects[i]].clone(),
)?;
}
Ok(chunk)
}
pub fn execute_parallel_vectorized(&mut self, algebra: &Algebra, data: &ColumnarData) -> Result<Solution> {
if data.row_count <= self.config.batch_size {
return self.execute_vectorized(algebra, data);
}
let chunk_size = data.row_count / self.config.num_threads;
let chunks: Vec<_> = (0..data.row_count)
.step_by(chunk_size)
.map(|start| {
let end = (start + chunk_size).min(data.row_count);
(start, end)
})
.collect();
let results: Result<Vec<Solution>> = par_scope(|s| {
chunks.into_iter()
.map(|(start, end)| {
s.spawn(move |_| {
let chunk_data = self.extract_chunk(data, start, end)?;
self.execute_vectorized(algebra, &chunk_data)
})
})
.collect::<Vec<_>>()
.into_iter()
.collect::<Result<Vec<_>>>()
});
let mut final_result = Solution::new();
for result in results? {
final_result.extend(result);
}
Ok(final_result)
}
}
impl Clone for ColumnarData {
fn clone(&self) -> Self {
Self {
subjects: self.subjects.clone(),
predicates: self.predicates.clone(),
objects: self.objects.clone(),
dictionary: Arc::clone(&self.dictionary),
reverse_dict: Arc::clone(&self.reverse_dict),
row_count: self.row_count,
}
}
}
pub struct VectorizedExecutionContext {
pub executor: VectorizedExecutor,
pub config: VectorizedConfig,
pub enable_parallel: bool,
pub enable_adaptive_chunking: bool,
}
impl VectorizedExecutionContext {
pub fn new(config: VectorizedConfig) -> Result<Self> {
let executor = VectorizedExecutor::new(config.clone())?;
Ok(Self {
executor,
config,
enable_parallel: true,
enable_adaptive_chunking: true,
})
}
pub fn execute_optimal(&mut self, algebra: &Algebra, data: &ColumnarData) -> Result<Solution> {
if self.enable_parallel && data.row_count > self.config.batch_size * 2 {
self.executor.execute_parallel_vectorized(algebra, data)
} else if self.enable_adaptive_chunking && data.row_count > self.config.batch_size {
self.executor.process_large_dataset(data, algebra)
} else {
self.executor.execute_vectorized(algebra, data)
}
}
pub fn get_performance_report(&self) -> VectorizedPerformanceReport {
let stats = self.executor.get_stats();
VectorizedPerformanceReport {
total_operations: stats.vectorized_ops,
avg_simd_efficiency: stats.simd_efficiency,
memory_bandwidth_mbps: stats.memory_bandwidth / (1024.0 * 1024.0),
cache_hit_ratio: stats.cache_hit_ratio,
avg_batch_processing_ms: stats.avg_batch_time.as_millis() as f64,
simd_lanes_utilized: stats.simd_lanes_used,
}
}
}
#[derive(Debug, Clone)]
pub struct VectorizedPerformanceReport {
pub total_operations: u64,
pub avg_simd_efficiency: f64,
pub memory_bandwidth_mbps: f64,
pub cache_hit_ratio: f64,
pub avg_batch_processing_ms: f64,
pub simd_lanes_utilized: u64,
}
#[cfg(test)]
mod tests {
use super::*;
use oxirs_core::model::NamedNode;
#[test]
fn test_columnar_data_creation() {
let mut data = ColumnarData::new(100);
let subject = Term::Iri(NamedNode::new("http://example.org/subject").unwrap());
let predicate = Term::Iri(NamedNode::new("http://example.org/predicate").unwrap());
let object = Term::Iri(NamedNode::new("http://example.org/object").unwrap());
assert!(data.add_triple(subject, predicate, object).is_ok());
assert_eq!(data.row_count, 1);
}
#[test]
fn test_vectorized_executor_creation() {
let config = VectorizedConfig::default();
let executor = VectorizedExecutor::new(config);
assert!(executor.is_ok());
}
#[test]
fn test_vectorized_filter() {
let mut data = ColumnarData::new(10);
for i in 0..5 {
let subject = Term::Iri(NamedNode::new(format!("http://example.org/s{i}")).unwrap());
let predicate = Term::Iri(NamedNode::new("http://example.org/predicate").unwrap());
let object = Term::Iri(NamedNode::new(format!("http://example.org/o{i}")).unwrap());
data.add_triple(subject, predicate, object).unwrap();
}
let pred_id = 1; let mask = data.vectorized_filter(pred_id);
assert!(mask.is_ok());
}
#[test]
fn test_performance_metrics() {
let config = VectorizedConfig::default();
let executor = VectorizedExecutor::new(config).unwrap();
let stats = executor.get_stats();
assert_eq!(stats.vectorized_ops, 0);
assert_eq!(stats.simd_efficiency, 0.0);
}
}