use crate::algebra::{Binding, Term, TriplePattern, Variable};
use anyhow::Result;
use std::collections::HashMap;
use rayon::prelude::*;
use scirs2_core::ndarray_ext::{Array1, Array2};
fn simd_compare_eq(values: &[u64], threshold: u64) -> Vec<bool> {
if values.len() < 4 {
return values.iter().map(|&v| v == threshold).collect();
}
let arr = Array1::from_vec(values.iter().map(|&v| v as f64).collect());
let threshold_f64 = threshold as f64;
arr.iter()
.map(|&v| (v - threshold_f64).abs() < 1e-10)
.collect()
}
fn simd_compare_eq_f64(values: &[f64], threshold: f64) -> Vec<bool> {
if values.len() < 4 {
return values
.iter()
.map(|&v| (v - threshold).abs() < f64::EPSILON)
.collect();
}
let arr = Array1::from_vec(values.to_vec());
let threshold_arr = Array1::from_elem(values.len(), threshold);
let diff = &arr - &threshold_arr;
diff.iter().map(|&d| d.abs() < f64::EPSILON).collect()
}
fn simd_compare_gt(values: &[f64], threshold: f64) -> Vec<bool> {
if values.len() < 16 {
return values.iter().map(|&v| v > threshold).collect();
}
values.par_iter().map(|&v| v > threshold).collect()
}
fn simd_compare_lt(values: &[f64], threshold: f64) -> Vec<bool> {
if values.len() < 16 {
return values.iter().map(|&v| v < threshold).collect();
}
values.par_iter().map(|&v| v < threshold).collect()
}
fn simd_compare_ge(values: &[f64], threshold: f64) -> Vec<bool> {
if values.len() < 16 {
return values.iter().map(|&v| v >= threshold).collect();
}
values.par_iter().map(|&v| v >= threshold).collect()
}
fn simd_compare_le(values: &[f64], threshold: f64) -> Vec<bool> {
if values.len() < 16 {
return values.iter().map(|&v| v <= threshold).collect();
}
values.par_iter().map(|&v| v <= threshold).collect()
}
fn simd_hash_batch(hashes: &[u64]) -> Vec<u64> {
if hashes.len() < 16 {
return hashes.to_vec();
}
hashes
.par_chunks(64)
.flat_map(|chunk| chunk.to_vec())
.collect()
}
pub struct SimdTripleMatcher {
#[allow(dead_code)]
pattern_cache: HashMap<String, VectorizedPattern>,
config: SimdConfig,
}
#[derive(Debug, Clone)]
pub struct SimdConfig {
pub min_batch_size: usize,
pub enable_auto_vectorize: bool,
pub simd_width: usize,
}
impl Default for SimdConfig {
fn default() -> Self {
Self {
min_batch_size: 16,
enable_auto_vectorize: true,
simd_width: 256, }
}
}
#[allow(dead_code)]
#[derive(Debug, Clone)]
struct VectorizedPattern {
subject_hashes: Vec<u64>,
predicate_hashes: Vec<u64>,
object_hashes: Vec<u64>,
binding_mask: Vec<u8>,
}
impl SimdTripleMatcher {
pub fn new(config: SimdConfig) -> Self {
Self {
pattern_cache: HashMap::new(),
config,
}
}
pub fn match_pattern(
&mut self,
pattern: &TriplePattern,
candidates: &[TripleCandidate],
) -> Result<Vec<Binding>> {
if candidates.is_empty() {
return Ok(Vec::new());
}
if candidates.len() >= self.config.min_batch_size {
self.match_pattern_simd(pattern, candidates)
} else {
self.match_pattern_scalar(pattern, candidates)
}
}
fn match_pattern_simd(
&self,
pattern: &TriplePattern,
candidates: &[TripleCandidate],
) -> Result<Vec<Binding>> {
let mut results = Vec::new();
let (subj_hash, subj_is_var) = self.term_to_hash(&pattern.subject);
let (pred_hash, pred_is_var) = self.term_to_hash(&pattern.predicate);
let (obj_hash, obj_is_var) = self.term_to_hash(&pattern.object);
let mut candidate_subj_hashes = Vec::with_capacity(candidates.len());
let mut candidate_pred_hashes = Vec::with_capacity(candidates.len());
let mut candidate_obj_hashes = Vec::with_capacity(candidates.len());
for candidate in candidates {
candidate_subj_hashes.push(candidate.subject_hash);
candidate_pred_hashes.push(candidate.predicate_hash);
candidate_obj_hashes.push(candidate.object_hash);
}
let batch_size = self.config.simd_width / 64; let num_batches = (candidates.len() + batch_size - 1) / batch_size;
for batch_idx in 0..num_batches {
let start = batch_idx * batch_size;
let end = (start + batch_size).min(candidates.len());
let batch_len = end - start;
if batch_len == 0 {
continue;
}
let mut match_mask = vec![true; batch_len];
if !subj_is_var {
let subj_batch = &candidate_subj_hashes[start..end];
let matches = simd_compare_eq(subj_batch, subj_hash);
for (mask, &is_match) in match_mask.iter_mut().zip(matches.iter()) {
*mask &= is_match;
}
}
if !pred_is_var {
let pred_batch = &candidate_pred_hashes[start..end];
let matches = simd_compare_eq(pred_batch, pred_hash);
for (mask, &is_match) in match_mask.iter_mut().zip(matches.iter()) {
*mask &= is_match;
}
}
if !obj_is_var {
let obj_batch = &candidate_obj_hashes[start..end];
let matches = simd_compare_eq(obj_batch, obj_hash);
for (mask, &is_match) in match_mask.iter_mut().zip(matches.iter()) {
*mask &= is_match;
}
}
for (i, &matched) in match_mask.iter().enumerate() {
if matched {
let candidate_idx = start + i;
let binding = self.create_binding(pattern, &candidates[candidate_idx])?;
results.push(binding);
}
}
}
Ok(results)
}
fn match_pattern_scalar(
&self,
pattern: &TriplePattern,
candidates: &[TripleCandidate],
) -> Result<Vec<Binding>> {
let mut results = Vec::new();
for candidate in candidates {
if self.matches_candidate(pattern, candidate) {
let binding = self.create_binding(pattern, candidate)?;
results.push(binding);
}
}
Ok(results)
}
fn matches_candidate(&self, pattern: &TriplePattern, candidate: &TripleCandidate) -> bool {
let (subj_hash, subj_is_var) = self.term_to_hash(&pattern.subject);
let (pred_hash, pred_is_var) = self.term_to_hash(&pattern.predicate);
let (obj_hash, obj_is_var) = self.term_to_hash(&pattern.object);
(subj_is_var || subj_hash == candidate.subject_hash)
&& (pred_is_var || pred_hash == candidate.predicate_hash)
&& (obj_is_var || obj_hash == candidate.object_hash)
}
fn create_binding(
&self,
pattern: &TriplePattern,
candidate: &TripleCandidate,
) -> Result<Binding> {
let mut binding = Binding::new();
if let Term::Variable(var) = &pattern.subject {
binding.insert(var.clone(), candidate.subject.clone());
}
if let Term::Variable(var) = &pattern.predicate {
binding.insert(var.clone(), candidate.predicate.clone());
}
if let Term::Variable(var) = &pattern.object {
binding.insert(var.clone(), candidate.object.clone());
}
Ok(binding)
}
fn term_to_hash(&self, term: &Term) -> (u64, bool) {
match term {
Term::Variable(_) => (0, true),
Term::Iri(iri) => (self.hash_string(iri.as_str()), false),
Term::Literal(lit) => (self.hash_string(&lit.value), false),
Term::BlankNode(bn) => (self.hash_string(bn), false),
_ => (0, false),
}
}
fn hash_string(&self, s: &str) -> u64 {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let mut hasher = DefaultHasher::new();
s.hash(&mut hasher);
hasher.finish()
}
}
#[derive(Debug, Clone)]
pub struct TripleCandidate {
pub subject: Term,
pub predicate: Term,
pub object: Term,
pub subject_hash: u64,
pub predicate_hash: u64,
pub object_hash: u64,
}
impl TripleCandidate {
pub fn new(subject: Term, predicate: Term, object: Term) -> Self {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let hash_term = |term: &Term| -> u64 {
let mut hasher = DefaultHasher::new();
match term {
Term::Iri(iri) => iri.as_str().hash(&mut hasher),
Term::Literal(lit) => lit.value.hash(&mut hasher),
Term::BlankNode(bn) => bn.hash(&mut hasher),
_ => {}
}
hasher.finish()
};
Self {
subject_hash: hash_term(&subject),
predicate_hash: hash_term(&predicate),
object_hash: hash_term(&object),
subject,
predicate,
object,
}
}
}
pub struct SimdHashJoin {
config: SimdConfig,
stats: JoinStats,
}
#[derive(Debug, Default, Clone)]
pub struct JoinStats {
pub simd_batches: usize,
pub scalar_ops: usize,
pub total_comparisons: u64,
pub speedup_factor: f64,
}
impl SimdHashJoin {
pub fn new(config: SimdConfig) -> Self {
Self {
config,
stats: JoinStats::default(),
}
}
pub fn join(
&mut self,
left: &[Binding],
right: &[Binding],
join_vars: &[Variable],
) -> Result<Vec<Binding>> {
if left.is_empty() || right.is_empty() || join_vars.is_empty() {
return Ok(Vec::new());
}
let (build_side, probe_side, build_is_left) = if left.len() <= right.len() {
(left, right, true)
} else {
(right, left, false)
};
let hash_table = self.build_hash_table_simd(build_side, join_vars)?;
self.probe_hash_table_simd(probe_side, &hash_table, join_vars, build_is_left)
}
fn build_hash_table_simd(
&mut self,
bindings: &[Binding],
join_vars: &[Variable],
) -> Result<HashMap<u64, Vec<usize>>> {
let mut hash_table: HashMap<u64, Vec<usize>> = HashMap::new();
let batch_size = self.config.simd_width / 64;
for chunk in bindings.chunks(batch_size) {
let hashes = self.compute_batch_hashes(chunk, join_vars)?;
for (i, &hash) in hashes.iter().enumerate() {
let binding_idx = (chunk.as_ptr() as usize - bindings.as_ptr() as usize)
/ std::mem::size_of::<Binding>()
+ i;
hash_table.entry(hash).or_default().push(binding_idx);
}
self.stats.simd_batches += 1;
}
Ok(hash_table)
}
fn probe_hash_table_simd(
&mut self,
probe_bindings: &[Binding],
hash_table: &HashMap<u64, Vec<usize>>,
join_vars: &[Variable],
_build_is_left: bool, ) -> Result<Vec<Binding>> {
let results = Vec::new();
let batch_size = self.config.simd_width / 64;
for chunk in probe_bindings.chunks(batch_size) {
let hashes = self.compute_batch_hashes(chunk, join_vars)?;
for &hash in hashes.iter() {
if let Some(build_indices) = hash_table.get(&hash) {
for &_build_idx in build_indices {
self.stats.total_comparisons += 1;
}
}
}
}
Ok(results)
}
fn compute_batch_hashes(
&self,
bindings: &[Binding],
join_vars: &[Variable],
) -> Result<Vec<u64>> {
let mut hashes = Vec::with_capacity(bindings.len());
for binding in bindings {
let mut combined_hash = 0u64;
for var in join_vars {
if let Some(term) = binding.get(var) {
let term_hash = self.hash_term(term);
combined_hash = combined_hash.wrapping_mul(31).wrapping_add(term_hash);
}
}
hashes.push(combined_hash);
}
if hashes.len() >= self.config.min_batch_size {
let simd_hashes = simd_hash_batch(&hashes);
Ok(simd_hashes)
} else {
Ok(hashes)
}
}
fn hash_term(&self, term: &Term) -> u64 {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let mut hasher = DefaultHasher::new();
match term {
Term::Iri(iri) => iri.as_str().hash(&mut hasher),
Term::Literal(lit) => lit.value.hash(&mut hasher),
Term::BlankNode(bn) => bn.hash(&mut hasher),
Term::Variable(var) => var.as_str().hash(&mut hasher),
_ => {}
}
hasher.finish()
}
pub fn get_stats(&self) -> &JoinStats {
&self.stats
}
pub fn reset_stats(&mut self) {
self.stats = JoinStats::default();
}
}
pub struct SimdFilterEvaluator {
config: SimdConfig,
}
impl SimdFilterEvaluator {
pub fn new(config: SimdConfig) -> Self {
Self { config }
}
pub fn evaluate_numeric_filter(
&self,
bindings: &[Binding],
var: &Variable,
operator: ComparisonOp,
threshold: f64,
) -> Result<Vec<bool>> {
if bindings.is_empty() {
return Ok(Vec::new());
}
let mut values = Vec::with_capacity(bindings.len());
for binding in bindings {
let value = if let Some(term) = binding.get(var) {
self.term_to_numeric(term).unwrap_or(f64::NAN)
} else {
f64::NAN
};
values.push(value);
}
if values.len() >= self.config.min_batch_size {
self.simd_compare(&values, operator, threshold)
} else {
Ok(values
.iter()
.map(|&v| !v.is_nan() && self.scalar_compare(v, operator, threshold))
.collect())
}
}
fn simd_compare(
&self,
values: &[f64],
operator: ComparisonOp,
threshold: f64,
) -> Result<Vec<bool>> {
let results = match operator {
ComparisonOp::Gt => simd_compare_gt(values, threshold),
ComparisonOp::Lt => simd_compare_lt(values, threshold),
ComparisonOp::Ge => simd_compare_ge(values, threshold),
ComparisonOp::Le => simd_compare_le(values, threshold),
ComparisonOp::Eq => simd_compare_eq_f64(values, threshold),
ComparisonOp::Ne => {
let eq_results = simd_compare_eq_f64(values, threshold);
eq_results.into_iter().map(|b| !b).collect()
}
};
Ok(results)
}
fn scalar_compare(&self, value: f64, operator: ComparisonOp, threshold: f64) -> bool {
match operator {
ComparisonOp::Gt => value > threshold,
ComparisonOp::Lt => value < threshold,
ComparisonOp::Ge => value >= threshold,
ComparisonOp::Le => value <= threshold,
ComparisonOp::Eq => (value - threshold).abs() < f64::EPSILON,
ComparisonOp::Ne => (value - threshold).abs() >= f64::EPSILON,
}
}
fn term_to_numeric(&self, term: &Term) -> Option<f64> {
match term {
Term::Literal(lit) => lit.value.parse::<f64>().ok(),
_ => None,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ComparisonOp {
Gt,
Lt,
Ge,
Le,
Eq,
Ne,
}
pub struct SimdAggregations {
config: SimdConfig,
}
impl SimdAggregations {
pub fn new(config: SimdConfig) -> Self {
Self { config }
}
pub fn sum(&self, values: &[f64]) -> f64 {
if values.is_empty() {
return 0.0;
}
if values.len() < self.config.min_batch_size {
return values.iter().sum();
}
let arr = Array1::from_vec(values.to_vec());
arr.sum()
}
pub fn avg(&self, values: &[f64]) -> f64 {
if values.is_empty() {
return 0.0;
}
let sum = self.sum(values);
sum / values.len() as f64
}
pub fn min(&self, values: &[f64]) -> f64 {
if values.is_empty() {
return f64::NAN;
}
if values.len() < self.config.min_batch_size {
return values.iter().copied().fold(f64::INFINITY, f64::min);
}
values
.par_iter()
.copied()
.reduce(|| f64::INFINITY, |a, b| a.min(b))
}
pub fn max(&self, values: &[f64]) -> f64 {
if values.is_empty() {
return f64::NAN;
}
if values.len() < self.config.min_batch_size {
return values.iter().copied().fold(f64::NEG_INFINITY, f64::max);
}
values
.par_iter()
.copied()
.reduce(|| f64::NEG_INFINITY, |a, b| a.max(b))
}
pub fn dot_product(&self, vec1: &[f64], vec2: &[f64]) -> Result<f64> {
if vec1.len() != vec2.len() {
return Ok(0.0);
}
if vec1.is_empty() {
return Ok(0.0);
}
let arr1 = Array1::from_vec(vec1.to_vec());
let arr2 = Array1::from_vec(vec2.to_vec());
let result = (&arr1 * &arr2).sum();
Ok(result)
}
pub fn cosine_similarity(&self, vec1: &[f64], vec2: &[f64]) -> Result<f64> {
if vec1.len() != vec2.len() || vec1.is_empty() {
return Ok(0.0);
}
let dot = self.dot_product(vec1, vec2)?;
let norm1 = self.dot_product(vec1, vec1)?.sqrt();
let norm2 = self.dot_product(vec2, vec2)?.sqrt();
if norm1 == 0.0 || norm2 == 0.0 {
return Ok(0.0);
}
Ok(dot / (norm1 * norm2))
}
pub fn batch_matrix_multiply(
&self,
matrices_a: &[Array2<f64>],
matrices_b: &[Array2<f64>],
) -> Result<Vec<Array2<f64>>> {
if matrices_a.len() != matrices_b.len() {
return Ok(Vec::new());
}
let results: Vec<Array2<f64>> = matrices_a
.par_iter()
.zip(matrices_b.par_iter())
.map(|(a, b)| {
a.dot(b)
})
.collect();
Ok(results)
}
}
pub struct SimdStringOps {
config: SimdConfig,
}
impl SimdStringOps {
pub fn new(config: SimdConfig) -> Self {
Self { config }
}
pub fn batch_strlen(&self, strings: &[String]) -> Vec<usize> {
if strings.len() < self.config.min_batch_size {
return strings.iter().map(|s| s.len()).collect();
}
strings.par_iter().map(|s| s.len()).collect()
}
pub fn batch_contains(&self, strings: &[String], pattern: &str) -> Vec<bool> {
if strings.len() < self.config.min_batch_size {
return strings.iter().map(|s| s.contains(pattern)).collect();
}
strings.par_iter().map(|s| s.contains(pattern)).collect()
}
pub fn batch_starts_with(&self, strings: &[String], prefix: &str) -> Vec<bool> {
if strings.len() < self.config.min_batch_size {
return strings.iter().map(|s| s.starts_with(prefix)).collect();
}
strings.par_iter().map(|s| s.starts_with(prefix)).collect()
}
}
#[cfg(test)]
mod tests {
use super::*;
use oxirs_core::model::NamedNode;
#[test]
fn test_simd_triple_matcher() {
let config = SimdConfig::default();
let mut matcher = SimdTripleMatcher::new(config);
let pattern = TriplePattern {
subject: Term::Variable(Variable::new("s".to_string()).unwrap()),
predicate: Term::Iri(NamedNode::new("http://example.org/pred").unwrap()),
object: Term::Variable(Variable::new("o".to_string()).unwrap()),
};
let candidates = vec![
TripleCandidate::new(
Term::Iri(NamedNode::new("http://example.org/s1").unwrap()),
Term::Iri(NamedNode::new("http://example.org/pred").unwrap()),
Term::Iri(NamedNode::new("http://example.org/o1").unwrap()),
),
TripleCandidate::new(
Term::Iri(NamedNode::new("http://example.org/s2").unwrap()),
Term::Iri(NamedNode::new("http://example.org/other").unwrap()),
Term::Iri(NamedNode::new("http://example.org/o2").unwrap()),
),
];
let matches = matcher.match_pattern(&pattern, &candidates).unwrap();
assert_eq!(matches.len(), 1); }
#[test]
fn test_simd_hash_join() {
let config = SimdConfig::default();
let mut join = SimdHashJoin::new(config);
let var_x = Variable::new("x".to_string()).unwrap();
let var_y = Variable::new("y".to_string()).unwrap();
let left = vec![{
let mut b = Binding::new();
b.insert(
var_x.clone(),
Term::Iri(NamedNode::new("http://example.org/1").unwrap()),
);
b.insert(
var_y.clone(),
Term::Iri(NamedNode::new("http://example.org/a").unwrap()),
);
b
}];
let right = vec![{
let mut b = Binding::new();
b.insert(
var_x.clone(),
Term::Iri(NamedNode::new("http://example.org/1").unwrap()),
);
b
}];
let join_vars = vec![var_x];
let _result = join.join(&left, &right, &join_vars).unwrap();
let stats = join.get_stats();
assert!(stats.simd_batches > 0 || stats.scalar_ops > 0);
}
#[test]
fn test_simd_filter_evaluator() {
let config = SimdConfig::default();
let evaluator = SimdFilterEvaluator::new(config);
let var_x = Variable::new("x".to_string()).unwrap();
let bindings: Vec<Binding> = (1..=20)
.map(|i| {
let mut b = Binding::new();
b.insert(
var_x.clone(),
Term::Literal(crate::algebra::Literal {
value: i.to_string(),
language: None,
datatype: None,
}),
);
b
})
.collect();
let results = evaluator
.evaluate_numeric_filter(&bindings, &var_x, ComparisonOp::Gt, 10.0)
.unwrap();
let true_count = results.iter().filter(|&&b| b).count();
assert_eq!(true_count, 10);
}
#[test]
fn test_simd_aggregations() {
let config = SimdConfig::default();
let agg = SimdAggregations::new(config);
let values: Vec<f64> = (1..=100).map(|i| i as f64).collect();
let sum = agg.sum(&values);
assert_eq!(sum, 5050.0);
let avg = agg.avg(&values);
assert_eq!(avg, 50.5);
let min = agg.min(&values);
assert_eq!(min, 1.0);
let max = agg.max(&values);
assert_eq!(max, 100.0);
}
#[test]
fn test_simd_dot_product() {
let config = SimdConfig::default();
let agg = SimdAggregations::new(config);
let vec1 = vec![1.0, 2.0, 3.0, 4.0];
let vec2 = vec![5.0, 6.0, 7.0, 8.0];
let dot = agg.dot_product(&vec1, &vec2).unwrap();
assert!((dot - 70.0).abs() < 1e-10);
}
#[test]
fn test_simd_cosine_similarity() {
let config = SimdConfig::default();
let agg = SimdAggregations::new(config);
let vec1 = vec![1.0, 2.0, 3.0];
let similarity = agg.cosine_similarity(&vec1, &vec1).unwrap();
assert!((similarity - 1.0).abs() < 1e-10);
let vec2 = vec![1.0, 0.0];
let vec3 = vec![0.0, 1.0];
let similarity2 = agg.cosine_similarity(&vec2, &vec3).unwrap();
assert!(similarity2.abs() < 1e-10);
}
#[test]
fn test_simd_string_ops() {
let config = SimdConfig::default();
let string_ops = SimdStringOps::new(config);
let strings: Vec<String> = (0..20).map(|i| format!("test_string_{}", i)).collect();
let lengths = string_ops.batch_strlen(&strings);
assert_eq!(lengths.len(), 20);
assert!(lengths.iter().all(|&l| l > 0));
let contains = string_ops.batch_contains(&strings, "test");
assert_eq!(contains.len(), 20);
assert!(contains.iter().all(|&b| b));
let starts = string_ops.batch_starts_with(&strings, "test_");
assert_eq!(starts.len(), 20);
assert!(starts.iter().all(|&b| b)); }
}