use crate::error::{FusekiError, FusekiResult};
use scirs2_core::memory::BufferPool;
use scirs2_core::metrics::{Counter, Histogram};
use scirs2_core::ndarray_ext::{Array1, Array2, ArrayView1};
use std::collections::HashMap;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
#[derive(Debug, Clone, PartialEq)]
pub struct TriplePattern {
pub subject: Option<String>,
pub predicate: Option<String>,
pub object: Option<String>,
}
#[derive(Debug, Clone)]
pub struct Triple {
pub subject: String,
pub predicate: String,
pub object: String,
pub subject_hash: u64,
pub predicate_hash: u64,
pub object_hash: u64,
}
impl Triple {
pub fn new(subject: String, predicate: String, object: String) -> Self {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let mut hasher = DefaultHasher::new();
subject.hash(&mut hasher);
let subject_hash = hasher.finish();
let mut hasher = DefaultHasher::new();
predicate.hash(&mut hasher);
let predicate_hash = hasher.finish();
let mut hasher = DefaultHasher::new();
object.hash(&mut hasher);
let object_hash = hasher.finish();
Self {
subject,
predicate,
object,
subject_hash,
predicate_hash,
object_hash,
}
}
}
pub struct SimdTripleMatcher {
triples: Vec<Triple>,
subject_index: HashMap<u64, Vec<usize>>,
predicate_index: HashMap<u64, Vec<usize>>,
object_index: HashMap<u64, Vec<usize>>,
buffer_pool: Arc<BufferPool<u8>>,
matches_counter: Counter,
match_time_histogram: Histogram,
simd_operations_counter: Counter,
total_matches: AtomicU64,
simd_accelerated_matches: AtomicU64,
fallback_matches: AtomicU64,
}
impl SimdTripleMatcher {
pub fn new() -> Self {
let buffer_pool = Arc::new(BufferPool::new());
Self {
triples: Vec::new(),
subject_index: HashMap::new(),
predicate_index: HashMap::new(),
object_index: HashMap::new(),
buffer_pool,
matches_counter: Counter::new("triple_matches".to_string()),
match_time_histogram: Histogram::new("match_time_ms".to_string()),
simd_operations_counter: Counter::new("simd_operations".to_string()),
total_matches: AtomicU64::new(0),
simd_accelerated_matches: AtomicU64::new(0),
fallback_matches: AtomicU64::new(0),
}
}
pub fn add_triple(&mut self, triple: Triple) {
let index = self.triples.len();
self.subject_index
.entry(triple.subject_hash)
.or_insert_with(Vec::new)
.push(index);
self.predicate_index
.entry(triple.predicate_hash)
.or_insert_with(Vec::new)
.push(index);
self.object_index
.entry(triple.object_hash)
.or_insert_with(Vec::new)
.push(index);
self.triples.push(triple);
}
pub fn add_triples(&mut self, triples: Vec<Triple>) {
for triple in triples {
self.add_triple(triple);
}
}
pub fn match_pattern(&self, pattern: &TriplePattern) -> FusekiResult<Vec<&Triple>> {
let start_time = std::time::Instant::now();
let candidate_indices = self.get_candidate_indices(pattern);
if candidate_indices.is_empty() {
return Ok(Vec::new());
}
let results = if candidate_indices.len() >= 32 {
self.simd_match(&candidate_indices, pattern)?
} else {
self.fallback_match(&candidate_indices, pattern)
};
self.matches_counter.inc();
self.match_time_histogram
.observe(start_time.elapsed().as_secs_f64());
self.total_matches.fetch_add(1, Ordering::Relaxed);
if candidate_indices.len() >= 32 {
self.simd_accelerated_matches
.fetch_add(1, Ordering::Relaxed);
self.simd_operations_counter.inc();
} else {
self.fallback_matches.fetch_add(1, Ordering::Relaxed);
}
Ok(results)
}
fn get_candidate_indices(&self, pattern: &TriplePattern) -> Vec<usize> {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let mut candidates: Option<Vec<usize>> = None;
if let Some(ref subject) = pattern.subject {
let mut hasher = DefaultHasher::new();
subject.hash(&mut hasher);
let hash = hasher.finish();
if let Some(indices) = self.subject_index.get(&hash) {
candidates = Some(indices.clone());
} else {
return Vec::new();
}
}
if let Some(ref predicate) = pattern.predicate {
let mut hasher = DefaultHasher::new();
predicate.hash(&mut hasher);
let hash = hasher.finish();
if let Some(indices) = self.predicate_index.get(&hash) {
candidates = match candidates {
Some(existing) => Some(
existing
.into_iter()
.filter(|i| indices.contains(i))
.collect(),
),
None => Some(indices.clone()),
};
} else {
return Vec::new();
}
}
if let Some(ref object) = pattern.object {
let mut hasher = DefaultHasher::new();
object.hash(&mut hasher);
let hash = hasher.finish();
if let Some(indices) = self.object_index.get(&hash) {
candidates = match candidates {
Some(existing) => Some(
existing
.into_iter()
.filter(|i| indices.contains(i))
.collect(),
),
None => Some(indices.clone()),
};
} else {
return Vec::new();
}
}
candidates.unwrap_or_else(|| (0..self.triples.len()).collect())
}
fn simd_match(
&self,
candidate_indices: &[usize],
pattern: &TriplePattern,
) -> FusekiResult<Vec<&Triple>> {
let subject_hashes: Vec<u64> = candidate_indices
.iter()
.map(|&i| self.triples[i].subject_hash)
.collect();
let predicate_hashes: Vec<u64> = candidate_indices
.iter()
.map(|&i| self.triples[i].predicate_hash)
.collect();
let object_hashes: Vec<u64> = candidate_indices
.iter()
.map(|&i| self.triples[i].object_hash)
.collect();
let subject_array = Array1::from_vec(subject_hashes);
let predicate_array = Array1::from_vec(predicate_hashes);
let object_array = Array1::from_vec(object_hashes);
let mut mask = vec![true; candidate_indices.len()];
if let Some(ref subject) = pattern.subject {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let mut hasher = DefaultHasher::new();
subject.hash(&mut hasher);
let target_hash = hasher.finish();
for (i, &hash) in subject_array.iter().enumerate() {
mask[i] = mask[i] && (hash == target_hash);
}
}
if let Some(ref predicate) = pattern.predicate {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let mut hasher = DefaultHasher::new();
predicate.hash(&mut hasher);
let target_hash = hasher.finish();
for (i, &hash) in predicate_array.iter().enumerate() {
mask[i] = mask[i] && (hash == target_hash);
}
}
if let Some(ref object) = pattern.object {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let mut hasher = DefaultHasher::new();
object.hash(&mut hasher);
let target_hash = hasher.finish();
for (i, &hash) in object_array.iter().enumerate() {
mask[i] = mask[i] && (hash == target_hash);
}
}
let results: Vec<&Triple> = candidate_indices
.iter()
.enumerate()
.filter(|(i, _)| mask[*i])
.map(|(_, &idx)| &self.triples[idx])
.collect();
Ok(results)
}
fn fallback_match(&self, candidate_indices: &[usize], pattern: &TriplePattern) -> Vec<&Triple> {
candidate_indices
.iter()
.map(|&i| &self.triples[i])
.filter(|triple| {
if let Some(ref subject) = pattern.subject {
if &triple.subject != subject {
return false;
}
}
if let Some(ref predicate) = pattern.predicate {
if &triple.predicate != predicate {
return false;
}
}
if let Some(ref object) = pattern.object {
if &triple.object != object {
return false;
}
}
true
})
.collect()
}
pub fn get_statistics(&self) -> MatcherStatistics {
MatcherStatistics {
total_triples: self.triples.len(),
total_matches: self.total_matches.load(Ordering::Relaxed),
simd_accelerated_matches: self.simd_accelerated_matches.load(Ordering::Relaxed),
fallback_matches: self.fallback_matches.load(Ordering::Relaxed),
index_sizes: IndexSizes {
subject_index_size: self.subject_index.len(),
predicate_index_size: self.predicate_index.len(),
object_index_size: self.object_index.len(),
},
}
}
pub fn clear(&mut self) {
self.triples.clear();
self.subject_index.clear();
self.predicate_index.clear();
self.object_index.clear();
self.total_matches.store(0, Ordering::Relaxed);
self.simd_accelerated_matches.store(0, Ordering::Relaxed);
self.fallback_matches.store(0, Ordering::Relaxed);
}
}
impl Default for SimdTripleMatcher {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone, serde::Serialize)]
pub struct MatcherStatistics {
pub total_triples: usize,
pub total_matches: u64,
pub simd_accelerated_matches: u64,
pub fallback_matches: u64,
pub index_sizes: IndexSizes,
}
#[derive(Debug, Clone, serde::Serialize)]
pub struct IndexSizes {
pub subject_index_size: usize,
pub predicate_index_size: usize,
pub object_index_size: usize,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_triple_creation() {
let triple = Triple::new(
"http://example.org/subject".to_string(),
"http://example.org/predicate".to_string(),
"http://example.org/object".to_string(),
);
assert_eq!(triple.subject, "http://example.org/subject");
assert_ne!(triple.subject_hash, 0);
}
#[test]
fn test_matcher_basic() {
let mut matcher = SimdTripleMatcher::new();
let triple1 = Triple::new(
"http://example.org/s1".to_string(),
"http://example.org/p1".to_string(),
"http://example.org/o1".to_string(),
);
let triple2 = Triple::new(
"http://example.org/s2".to_string(),
"http://example.org/p1".to_string(),
"http://example.org/o2".to_string(),
);
matcher.add_triple(triple1);
matcher.add_triple(triple2);
let pattern = TriplePattern {
subject: None,
predicate: Some("http://example.org/p1".to_string()),
object: None,
};
let results = matcher.match_pattern(&pattern).unwrap();
assert_eq!(results.len(), 2);
}
#[test]
fn test_matcher_specific() {
let mut matcher = SimdTripleMatcher::new();
let triple = Triple::new(
"http://example.org/subject".to_string(),
"http://example.org/predicate".to_string(),
"http://example.org/object".to_string(),
);
matcher.add_triple(triple);
let pattern = TriplePattern {
subject: Some("http://example.org/subject".to_string()),
predicate: Some("http://example.org/predicate".to_string()),
object: Some("http://example.org/object".to_string()),
};
let results = matcher.match_pattern(&pattern).unwrap();
assert_eq!(results.len(), 1);
assert_eq!(results[0].subject, "http://example.org/subject");
}
#[test]
fn test_matcher_no_match() {
let mut matcher = SimdTripleMatcher::new();
let triple = Triple::new(
"http://example.org/s1".to_string(),
"http://example.org/p1".to_string(),
"http://example.org/o1".to_string(),
);
matcher.add_triple(triple);
let pattern = TriplePattern {
subject: Some("http://example.org/nonexistent".to_string()),
predicate: None,
object: None,
};
let results = matcher.match_pattern(&pattern).unwrap();
assert_eq!(results.len(), 0);
}
#[test]
fn test_matcher_statistics() {
let mut matcher = SimdTripleMatcher::new();
for i in 0..100 {
let triple = Triple::new(
format!("http://example.org/s{}", i),
"http://example.org/p1".to_string(),
format!("http://example.org/o{}", i),
);
matcher.add_triple(triple);
}
let stats = matcher.get_statistics();
assert_eq!(stats.total_triples, 100);
assert!(stats.index_sizes.subject_index_size > 0);
}
#[test]
fn test_matcher_batch_add() {
let mut matcher = SimdTripleMatcher::new();
let triples = vec![
Triple::new("s1".to_string(), "p1".to_string(), "o1".to_string()),
Triple::new("s2".to_string(), "p2".to_string(), "o2".to_string()),
Triple::new("s3".to_string(), "p3".to_string(), "o3".to_string()),
];
matcher.add_triples(triples);
let stats = matcher.get_statistics();
assert_eq!(stats.total_triples, 3);
}
#[test]
fn test_matcher_clear() {
let mut matcher = SimdTripleMatcher::new();
let triple = Triple::new("s1".to_string(), "p1".to_string(), "o1".to_string());
matcher.add_triple(triple);
matcher.clear();
let stats = matcher.get_statistics();
assert_eq!(stats.total_triples, 0);
assert_eq!(stats.total_matches, 0);
}
#[test]
fn test_simd_acceleration_threshold() {
let mut matcher = SimdTripleMatcher::new();
for i in 0..32 {
let triple = Triple::new(format!("s{}", i), "p".to_string(), format!("o{}", i));
matcher.add_triple(triple);
}
let pattern = TriplePattern {
subject: None,
predicate: Some("p".to_string()),
object: None,
};
let results = matcher.match_pattern(&pattern).unwrap();
assert_eq!(results.len(), 32);
let stats = matcher.get_statistics();
assert_eq!(stats.simd_accelerated_matches, 1);
}
}