#[cfg(all(feature = "simd", not(target_arch = "wasm32")))]
use crate::performance::optimized_simd::OptimizedSimdOps;
use crate::VeloxxError;
use rayon::prelude::*;
use std::sync::Arc;
pub struct AdvancedParallelExecutor {
thread_count: usize,
chunk_size: usize,
_queue_depth: usize,
}
impl AdvancedParallelExecutor {
pub fn new() -> Self {
let thread_count = rayon::current_num_threads();
Self {
thread_count,
chunk_size: 8192, _queue_depth: thread_count * 4,
}
}
pub fn with_config(thread_count: usize, chunk_size: usize) -> Self {
Self {
thread_count,
chunk_size,
_queue_depth: thread_count * 4,
}
}
pub fn parallel_simd_add(
&self,
a: &[f64],
b: &[f64],
result: &mut [f64],
) -> Result<(), VeloxxError> {
assert_eq!(a.len(), b.len());
assert_eq!(a.len(), result.len());
let len = a.len();
#[cfg(all(feature = "simd", not(target_arch = "wasm32")))]
{
if len < self.chunk_size * 2 {
a.optimized_simd_add(b, result);
return Ok(());
}
let chunk_size = self.chunk_size;
result
.par_chunks_mut(chunk_size)
.zip(a.par_chunks(chunk_size))
.zip(b.par_chunks(chunk_size))
.try_for_each(
|((chunk_result, chunk_a), chunk_b)| -> Result<(), VeloxxError> {
chunk_a.optimized_simd_add(chunk_b, chunk_result);
Ok(())
},
)?;
}
#[cfg(not(all(feature = "simd", not(target_arch = "wasm32"))))]
{
for i in 0..len {
result[i] = a[i] + b[i];
}
}
Ok(())
}
pub fn parallel_simd_mul(
&self,
a: &[f64],
b: &[f64],
result: &mut [f64],
) -> Result<(), VeloxxError> {
assert_eq!(a.len(), b.len());
assert_eq!(a.len(), result.len());
let len = a.len();
#[cfg(all(feature = "simd", not(target_arch = "wasm32")))]
{
if len < self.chunk_size * 2 {
a.optimized_simd_mul(b, result);
return Ok(());
}
result
.par_chunks_mut(self.chunk_size)
.zip(a.par_chunks(self.chunk_size))
.zip(b.par_chunks(self.chunk_size))
.try_for_each(
|((chunk_result, chunk_a), chunk_b)| -> Result<(), VeloxxError> {
chunk_a.optimized_simd_mul(chunk_b, chunk_result);
Ok(())
},
)?;
}
#[cfg(not(all(feature = "simd", not(target_arch = "wasm32"))))]
{
for i in 0..len {
result[i] = a[i] * b[i];
}
}
Ok(())
}
pub fn parallel_simd_sum(&self, data: &[f64]) -> Result<f64, VeloxxError> {
let len = data.len();
#[cfg(all(feature = "simd", not(target_arch = "wasm32")))]
{
if len < self.chunk_size * 2 {
return Ok(data.optimized_simd_sum());
}
let chunk_size = self.chunk_size;
let num_chunks = len.div_ceil(chunk_size);
let partial_sums: Result<Vec<f64>, VeloxxError> = (0..num_chunks)
.into_par_iter()
.map(|chunk_idx| -> Result<f64, VeloxxError> {
let start = chunk_idx * chunk_size;
let end = std::cmp::min(start + chunk_size, len);
let chunk = &data[start..end];
Ok(chunk.optimized_simd_sum())
})
.collect();
let sums = partial_sums?;
Ok(sums.iter().sum())
}
#[cfg(not(all(feature = "simd", not(target_arch = "wasm32"))))]
{
Ok(data.iter().sum())
}
}
pub fn parallel_fused_add_mul(
&self,
a: &[f64],
b: &[f64],
c: &[f64],
result: &mut [f64],
) -> Result<(), VeloxxError> {
assert_eq!(a.len(), b.len());
assert_eq!(a.len(), c.len());
assert_eq!(a.len(), result.len());
let len = a.len();
if len < self.chunk_size * 2 {
self.sequential_fused_add_mul(a, b, c, result)?;
return Ok(());
}
result
.par_chunks_mut(self.chunk_size)
.zip(a.par_chunks(self.chunk_size))
.zip(b.par_chunks(self.chunk_size))
.zip(c.par_chunks(self.chunk_size))
.try_for_each(
|(((chunk_result, chunk_a), chunk_b), chunk_c)| -> Result<(), VeloxxError> {
self.sequential_fused_add_mul(chunk_a, chunk_b, chunk_c, chunk_result)?;
Ok(())
},
)?;
Ok(())
}
fn sequential_fused_add_mul(
&self,
a: &[f64],
b: &[f64],
c: &[f64],
result: &mut [f64],
) -> Result<(), VeloxxError> {
let len = a.len();
#[cfg(all(feature = "simd", not(target_arch = "wasm32")))]
{
use wide::f64x4;
let simd_len = len / 4;
let remainder = len % 4;
for i in 0..simd_len {
let a_vec = f64x4::from([a[i * 4], a[i * 4 + 1], a[i * 4 + 2], a[i * 4 + 3]]);
let b_vec = f64x4::from([b[i * 4], b[i * 4 + 1], b[i * 4 + 2], b[i * 4 + 3]]);
let c_vec = f64x4::from([c[i * 4], c[i * 4 + 1], c[i * 4 + 2], c[i * 4 + 3]]);
let result_vec = (a_vec + b_vec) * c_vec;
let result_array = result_vec.to_array();
result[i * 4] = result_array[0];
result[i * 4 + 1] = result_array[1];
result[i * 4 + 2] = result_array[2];
result[i * 4 + 3] = result_array[3];
}
for i in (len - remainder)..len {
result[i] = (a[i] + b[i]) * c[i];
}
}
#[cfg(not(all(feature = "simd", not(target_arch = "wasm32"))))]
{
for i in 0..len {
result[i] = (a[i] + b[i]) * c[i];
}
}
Ok(())
}
pub fn adaptive_chunk_size(&self, data_size: usize) -> usize {
let base_chunk = self.chunk_size;
let thread_count = self.thread_count;
if data_size < base_chunk * thread_count {
std::cmp::max(data_size / thread_count, 1024)
} else if data_size > base_chunk * thread_count * 8 {
base_chunk / 2
} else {
base_chunk
}
}
pub fn numa_aware_process<F>(&self, data_size: usize, processor: F) -> Result<(), VeloxxError>
where
F: Fn(usize, usize) -> Result<(), VeloxxError> + Sync + Send,
{
let chunk_size = self.adaptive_chunk_size(data_size);
let num_chunks = data_size.div_ceil(chunk_size);
(0..num_chunks)
.into_par_iter()
.try_for_each(|chunk_idx| -> Result<(), VeloxxError> {
let start = chunk_idx * chunk_size;
let end = std::cmp::min(start + chunk_size, data_size);
processor(start, end)
})?;
Ok(())
}
}
impl Default for AdvancedParallelExecutor {
fn default() -> Self {
Self::new()
}
}
static GLOBAL_EXECUTOR: std::sync::OnceLock<Arc<AdvancedParallelExecutor>> =
std::sync::OnceLock::new();
pub fn global_executor() -> &'static Arc<AdvancedParallelExecutor> {
GLOBAL_EXECUTOR.get_or_init(|| Arc::new(AdvancedParallelExecutor::new()))
}
pub fn parallel_simd_add_advanced(
a: &[f64],
b: &[f64],
result: &mut [f64],
) -> Result<(), VeloxxError> {
global_executor().parallel_simd_add(a, b, result)
}
pub fn parallel_simd_mul_advanced(
a: &[f64],
b: &[f64],
result: &mut [f64],
) -> Result<(), VeloxxError> {
global_executor().parallel_simd_mul(a, b, result)
}
pub fn parallel_simd_sum_advanced(data: &[f64]) -> Result<f64, VeloxxError> {
global_executor().parallel_simd_sum(data)
}
pub fn parallel_fused_add_mul_advanced(
a: &[f64],
b: &[f64],
c: &[f64],
result: &mut [f64],
) -> Result<(), VeloxxError> {
global_executor().parallel_fused_add_mul(a, b, c, result)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parallel_simd_add() {
let executor = AdvancedParallelExecutor::new();
let size = 10000;
let a: Vec<f64> = (0..size).map(|i| i as f64).collect();
let b: Vec<f64> = (0..size).map(|i| (i * 2) as f64).collect();
let mut result = vec![0.0; size];
executor.parallel_simd_add(&a, &b, &mut result).unwrap();
for i in 0..size {
assert_eq!(result[i], a[i] + b[i]);
}
}
#[test]
fn test_parallel_fused_add_mul() {
let executor = AdvancedParallelExecutor::new();
let size = 8192;
let a: Vec<f64> = (0..size).map(|i| i as f64).collect();
let b: Vec<f64> = (0..size).map(|i| (i + 1) as f64).collect();
let c: Vec<f64> = (0..size).map(|_| 2.0).collect();
let mut result = vec![0.0; size];
executor
.parallel_fused_add_mul(&a, &b, &c, &mut result)
.unwrap();
for i in 0..size {
let expected = (a[i] + b[i]) * c[i];
assert_eq!(result[i], expected);
}
}
#[test]
fn test_adaptive_chunk_size() {
let executor = AdvancedParallelExecutor::new();
assert!(executor.adaptive_chunk_size(1000) >= 1024);
let medium_size = executor.chunk_size * executor.thread_count * 4;
assert_eq!(
executor.adaptive_chunk_size(medium_size),
executor.chunk_size
);
let large_size = executor.chunk_size * executor.thread_count * 16;
assert!(executor.adaptive_chunk_size(large_size) < executor.chunk_size);
}
}