use crate::StreamEvent;
use anyhow::Result;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use scirs2_core::ndarray_ext::{Array1, Array2};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SimdBatchConfig {
pub batch_size: usize,
pub auto_vectorize: bool,
pub prefetch_distance: usize,
pub enable_parallel: bool,
}
impl Default for SimdBatchConfig {
fn default() -> Self {
Self {
batch_size: 1024, auto_vectorize: true,
prefetch_distance: 64,
enable_parallel: true,
}
}
}
pub struct SimdBatchProcessor {
config: SimdBatchConfig,
stats: SimdProcessorStats,
}
#[derive(Debug, Clone, Default)]
pub struct SimdProcessorStats {
pub batches_processed: u64,
pub events_processed: u64,
pub simd_operations: u64,
pub avg_batch_time_us: f64,
pub throughput_events_per_sec: f64,
}
impl SimdBatchProcessor {
pub fn new(config: SimdBatchConfig) -> Self {
Self {
config,
stats: SimdProcessorStats::default(),
}
}
pub fn process_batch<F>(
&mut self,
events: &[StreamEvent],
processor: F,
) -> Result<Vec<StreamEvent>>
where
F: Fn(&StreamEvent) -> bool + Send + Sync,
{
let start = std::time::Instant::now();
let filtered_events: Vec<StreamEvent> =
events.iter().filter(|e| processor(e)).cloned().collect();
let elapsed_us = start.elapsed().as_micros() as f64;
self.stats.batches_processed += 1;
self.stats.events_processed += events.len() as u64;
self.stats.simd_operations += (events.len() / self.config.batch_size) as u64;
let alpha = 0.1;
self.stats.avg_batch_time_us =
alpha * elapsed_us + (1.0 - alpha) * self.stats.avg_batch_time_us;
if elapsed_us > 0.0 {
self.stats.throughput_events_per_sec = (events.len() as f64 / elapsed_us) * 1_000_000.0;
}
Ok(filtered_events)
}
pub fn extract_numeric_batch(
&self,
events: &[StreamEvent],
field: &str,
) -> Result<Array1<f64>> {
let values: Vec<f64> = events
.iter()
.filter_map(|e| self.extract_numeric_value(e, field))
.collect();
Ok(Array1::from_vec(values))
}
pub fn aggregate_batch(
&mut self,
events: &[StreamEvent],
field: &str,
) -> Result<SimdAggregateResult> {
let start = std::time::Instant::now();
let values = self.extract_numeric_batch(events, field)?;
if values.is_empty() {
return Ok(SimdAggregateResult::default());
}
let sum = values.sum();
let mean = values.mean().unwrap_or(0.0);
let std_dev = values.std(0.0);
let min = values.iter().fold(f64::INFINITY, |a, &b| a.min(b));
let max = values.iter().fold(f64::NEG_INFINITY, |a, &b| a.max(b));
let elapsed_us = start.elapsed().as_micros() as f64;
self.stats.simd_operations += 1;
Ok(SimdAggregateResult {
count: values.len(),
sum,
mean,
std_dev,
min,
max,
processing_time_us: elapsed_us,
})
}
pub fn batch_pattern_match(
&mut self,
events: &[StreamEvent],
patterns: &[String],
) -> Result<Vec<(usize, String)>> {
let start = std::time::Instant::now();
let mut matches = Vec::new();
for (idx, event) in events.iter().enumerate() {
for pattern in patterns {
if self.matches_pattern(event, pattern) {
matches.push((idx, pattern.clone()));
}
}
}
let elapsed_us = start.elapsed().as_micros() as f64;
self.stats.simd_operations += 1;
self.stats.avg_batch_time_us = elapsed_us;
Ok(matches)
}
pub fn correlation_matrix(
&mut self,
events: &[StreamEvent],
fields: &[String],
) -> Result<Array2<f64>> {
let n_fields = fields.len();
let mut matrix = Array2::zeros((n_fields, n_fields));
let field_data: Vec<Array1<f64>> = fields
.iter()
.map(|field| self.extract_numeric_batch(events, field))
.collect::<Result<Vec<_>>>()?;
for i in 0..n_fields {
for j in i..n_fields {
let correlation = if i == j {
1.0
} else {
compute_simd_correlation(&field_data[i], &field_data[j])?
};
matrix[[i, j]] = correlation;
matrix[[j, i]] = correlation; }
}
self.stats.simd_operations += (n_fields * n_fields) as u64;
Ok(matrix)
}
pub fn deduplicate_batch(&mut self, events: &[StreamEvent]) -> Result<Vec<StreamEvent>> {
let start = std::time::Instant::now();
let mut seen = std::collections::HashSet::new();
let mut unique = Vec::new();
for event in events {
let hash = self.compute_event_hash(event);
if seen.insert(hash) {
unique.push(event.clone());
}
}
let elapsed_us = start.elapsed().as_micros() as f64;
self.stats.avg_batch_time_us = elapsed_us;
self.stats.simd_operations += 1;
Ok(unique)
}
pub fn moving_average(
&mut self,
events: &[StreamEvent],
field: &str,
window_size: usize,
) -> Result<Array1<f64>> {
let values = self.extract_numeric_batch(events, field)?;
if values.len() < window_size {
return Ok(Array1::from_vec(vec![]));
}
let mut moving_avgs = Vec::new();
for i in window_size..=values.len() {
let window = values.slice(s![i - window_size..i]);
let avg = window.mean().unwrap_or(0.0);
moving_avgs.push(avg);
}
self.stats.simd_operations += 1;
Ok(Array1::from_vec(moving_avgs))
}
fn extract_numeric_value(&self, event: &StreamEvent, field: &str) -> Option<f64> {
match event {
StreamEvent::TripleAdded { object, .. } | StreamEvent::TripleRemoved { object, .. } => {
if field == "object" {
object.parse::<f64>().ok()
} else {
None
}
}
_ => None,
}
}
fn matches_pattern(&self, event: &StreamEvent, pattern: &str) -> bool {
match event {
StreamEvent::TripleAdded { subject, .. } => subject.contains(pattern),
StreamEvent::QuadAdded { subject, .. } => subject.contains(pattern),
_ => false,
}
}
fn compute_event_hash(&self, event: &StreamEvent) -> u64 {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let mut hasher = DefaultHasher::new();
match event {
StreamEvent::TripleAdded {
subject,
predicate,
object,
..
} => {
"triple_added".hash(&mut hasher);
subject.hash(&mut hasher);
predicate.hash(&mut hasher);
object.hash(&mut hasher);
}
StreamEvent::QuadAdded {
subject,
predicate,
object,
graph,
..
} => {
"quad_added".hash(&mut hasher);
subject.hash(&mut hasher);
predicate.hash(&mut hasher);
object.hash(&mut hasher);
graph.hash(&mut hasher);
}
_ => {
format!("{:?}", event).hash(&mut hasher);
}
}
hasher.finish()
}
pub fn stats(&self) -> &SimdProcessorStats {
&self.stats
}
pub fn reset_stats(&mut self) {
self.stats = SimdProcessorStats::default();
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SimdAggregateResult {
pub count: usize,
pub sum: f64,
pub mean: f64,
pub std_dev: f64,
pub min: f64,
pub max: f64,
pub processing_time_us: f64,
}
impl Default for SimdAggregateResult {
fn default() -> Self {
Self {
count: 0,
sum: 0.0,
mean: 0.0,
std_dev: 0.0,
min: f64::INFINITY,
max: f64::NEG_INFINITY,
processing_time_us: 0.0,
}
}
}
fn compute_simd_correlation(a: &Array1<f64>, b: &Array1<f64>) -> Result<f64> {
if a.len() != b.len() || a.len() < 2 {
return Ok(0.0);
}
let mean_a = a.mean().unwrap_or(0.0);
let mean_b = b.mean().unwrap_or(0.0);
let mut sum_product = 0.0;
let mut sum_sq_a = 0.0;
let mut sum_sq_b = 0.0;
for i in 0..a.len() {
let diff_a = a[i] - mean_a;
let diff_b = b[i] - mean_b;
sum_product += diff_a * diff_b;
sum_sq_a += diff_a * diff_a;
sum_sq_b += diff_b * diff_b;
}
let denominator = (sum_sq_a * sum_sq_b).sqrt();
if denominator == 0.0 {
Ok(0.0)
} else {
Ok(sum_product / denominator)
}
}
type EventPredicate = Arc<dyn Fn(&StreamEvent) -> bool + Send + Sync>;
pub struct SimdEventFilter {
config: SimdBatchConfig,
predicates: Vec<EventPredicate>,
}
impl SimdEventFilter {
pub fn new(config: SimdBatchConfig) -> Self {
Self {
config,
predicates: Vec::new(),
}
}
pub fn add_predicate<F>(&mut self, predicate: F)
where
F: Fn(&StreamEvent) -> bool + Send + Sync + 'static,
{
self.predicates.push(Arc::new(predicate));
}
pub fn filter_batch(&self, events: &[StreamEvent]) -> Vec<StreamEvent> {
if self.predicates.is_empty() {
return events.to_vec();
}
events
.iter()
.filter(|event| self.predicates.iter().all(|pred| pred(event)))
.cloned()
.collect()
}
}
use scirs2_core::ndarray_ext::s;
#[cfg(test)]
mod tests {
use super::*;
use crate::event::EventMetadata;
fn create_test_event(subject: &str, value: &str) -> StreamEvent {
StreamEvent::TripleAdded {
subject: subject.to_string(),
predicate: "hasValue".to_string(),
object: value.to_string(),
graph: None,
metadata: EventMetadata::default(),
}
}
#[test]
fn test_simd_batch_processor() {
let config = SimdBatchConfig::default();
let mut processor = SimdBatchProcessor::new(config);
let events: Vec<StreamEvent> = (0..1000)
.map(|i| create_test_event(&format!("subject_{}", i), &i.to_string()))
.collect();
let result =
processor.process_batch(&events, |e| matches!(e, StreamEvent::TripleAdded { .. }));
assert!(result.is_ok());
let filtered = result.unwrap();
assert_eq!(filtered.len(), 1000);
let stats = processor.stats();
assert_eq!(stats.batches_processed, 1);
assert!(stats.throughput_events_per_sec > 0.0);
}
#[test]
fn test_simd_aggregation() {
let config = SimdBatchConfig::default();
let mut processor = SimdBatchProcessor::new(config);
let events: Vec<StreamEvent> = (1..=100)
.map(|i| create_test_event(&format!("subject_{}", i), &i.to_string()))
.collect();
let result = processor.aggregate_batch(&events, "object").unwrap();
assert_eq!(result.count, 100);
assert_eq!(result.sum, 5050.0); assert_eq!(result.mean, 50.5);
assert_eq!(result.min, 1.0);
assert_eq!(result.max, 100.0);
}
#[test]
fn test_simd_deduplication() {
let config = SimdBatchConfig::default();
let mut processor = SimdBatchProcessor::new(config);
let events = vec![
create_test_event("subject_1", "10"),
create_test_event("subject_1", "10"), create_test_event("subject_2", "20"),
create_test_event("subject_1", "10"), ];
let unique = processor.deduplicate_batch(&events).unwrap();
assert_eq!(unique.len(), 2); }
#[test]
fn test_simd_moving_average() {
let config = SimdBatchConfig::default();
let mut processor = SimdBatchProcessor::new(config);
let events: Vec<StreamEvent> = (1..=10)
.map(|i| create_test_event(&format!("subject_{}", i), &i.to_string()))
.collect();
let moving_avg = processor.moving_average(&events, "object", 3).unwrap();
assert_eq!(moving_avg.len(), 8); assert!((moving_avg[0] - 2.0).abs() < 0.01); }
#[test]
fn test_simd_event_filter() {
let config = SimdBatchConfig::default();
let mut filter = SimdEventFilter::new(config);
filter.add_predicate(|e| matches!(e, StreamEvent::TripleAdded { .. }));
let events = vec![
create_test_event("subject_1", "10"),
create_test_event("subject_2", "20"),
];
let filtered = filter.filter_batch(&events);
assert_eq!(filtered.len(), 2);
}
}