use scirs2_core::ndarray::ArrayView1;
use scirs2_core::numeric::{Float, NumAssign, SparseElement};
use std::iter::Sum;
use scirs2_core::parallel_ops::*;
use scirs2_core::simd_ops::SimdUnifiedOps;
#[derive(Debug, Clone)]
pub struct ParallelVectorOptions {
pub use_parallel: bool,
pub parallel_threshold: usize,
pub chunk_size: usize,
pub use_simd: bool,
pub simd_threshold: usize,
}
impl Default for ParallelVectorOptions {
fn default() -> Self {
Self {
use_parallel: true,
parallel_threshold: 10000,
chunk_size: 1024,
use_simd: true,
simd_threshold: 32,
}
}
}
#[allow(dead_code)]
pub fn parallel_dot<T>(x: &[T], y: &[T], options: Option<ParallelVectorOptions>) -> T
where
T: Float + SparseElement + NumAssign + Sum + Copy + Send + Sync + SimdUnifiedOps,
{
assert_eq!(
x.len(),
y.len(),
"Vector lengths must be equal for dot product"
);
if x.is_empty() {
return T::sparse_zero();
}
let opts = options.unwrap_or_default();
if opts.use_parallel && x.len() >= opts.parallel_threshold {
let chunks = x
.chunks(opts.chunk_size)
.zip(y.chunks(opts.chunk_size))
.collect::<Vec<_>>();
let partial_sums: Vec<T> = parallel_map(&chunks, |(x_chunk, y_chunk)| {
if opts.use_simd && x_chunk.len() >= opts.simd_threshold {
let x_view = ArrayView1::from(x_chunk);
let y_view = ArrayView1::from(*y_chunk);
T::simd_dot(&x_view, &y_view)
} else {
x_chunk
.iter()
.zip(y_chunk.iter())
.map(|(&xi, &yi)| xi * yi)
.sum()
}
});
partial_sums.into_iter().sum()
} else if opts.use_simd && x.len() >= opts.simd_threshold {
let x_view = ArrayView1::from(x);
let y_view = ArrayView1::from(y);
T::simd_dot(&x_view, &y_view)
} else {
x.iter().zip(y).map(|(&xi, &yi)| xi * yi).sum()
}
}
#[allow(dead_code)]
pub fn parallel_norm2<T>(x: &[T], options: Option<ParallelVectorOptions>) -> T
where
T: Float + SparseElement + NumAssign + Sum + Copy + Send + Sync + SimdUnifiedOps,
{
if x.is_empty() {
return T::sparse_zero();
}
let opts = options.unwrap_or_default();
if opts.use_parallel && x.len() >= opts.parallel_threshold {
let chunks = x.chunks(opts.chunk_size).collect::<Vec<_>>();
let partial_sums: Vec<T> = parallel_map(&chunks, |chunk| {
if opts.use_simd && chunk.len() >= opts.simd_threshold {
let chunk_view = ArrayView1::from(*chunk);
T::simd_dot(&chunk_view, &chunk_view)
} else {
chunk.iter().map(|&xi| xi * xi).sum()
}
});
partial_sums.into_iter().sum::<T>().sqrt()
} else if opts.use_simd && x.len() >= opts.simd_threshold {
let x_view = ArrayView1::from(x);
T::simd_dot(&x_view, &x_view).sqrt()
} else {
x.iter().map(|&xi| xi * xi).sum::<T>().sqrt()
}
}
#[allow(dead_code)]
pub fn parallel_vector_add<T>(x: &[T], y: &[T], z: &mut [T], options: Option<ParallelVectorOptions>)
where
T: Float + SparseElement + NumAssign + Send + Sync + Copy + SimdUnifiedOps,
{
assert_eq!(x.len(), y.len(), "Input vector lengths must be equal");
assert_eq!(x.len(), z.len(), "Output vector length must match input");
if x.is_empty() {
return;
}
let opts = options.unwrap_or_default();
if opts.use_parallel && x.len() >= opts.parallel_threshold {
let chunk_size = opts.chunk_size;
let chunks: Vec<_> = (0..x.len()).step_by(chunk_size).collect();
for start in chunks {
let end = (start + chunk_size).min(x.len());
let x_slice = &x[start..end];
let y_slice = &y[start..end];
let z_slice = &mut z[start..end];
if opts.use_simd && x_slice.len() >= opts.simd_threshold {
let x_view = ArrayView1::from(x_slice);
let y_view = ArrayView1::from(y_slice);
let result = T::simd_add(&x_view, &y_view);
z_slice.copy_from_slice(result.as_slice().expect("Operation failed"));
} else {
for ((xi, yi), zi) in x_slice.iter().zip(y_slice).zip(z_slice.iter_mut()) {
*zi = *xi + *yi;
}
}
}
} else if opts.use_simd && x.len() >= opts.simd_threshold {
let x_view = ArrayView1::from(x);
let y_view = ArrayView1::from(y);
let result = T::simd_add(&x_view, &y_view);
z.copy_from_slice(result.as_slice().expect("Operation failed"));
} else {
for ((xi, yi), zi) in x.iter().zip(y).zip(z) {
*zi = *xi + *yi;
}
}
}
#[allow(dead_code)]
pub fn parallel_vector_sub<T>(x: &[T], y: &[T], z: &mut [T], options: Option<ParallelVectorOptions>)
where
T: Float + SparseElement + NumAssign + Send + Sync + Copy + SimdUnifiedOps,
{
assert_eq!(x.len(), y.len(), "Input vector lengths must be equal");
assert_eq!(x.len(), z.len(), "Output vector length must match input");
if x.is_empty() {
return;
}
let opts = options.unwrap_or_default();
if opts.use_parallel && x.len() >= opts.parallel_threshold {
let chunks: Vec<_> = x
.chunks(opts.chunk_size)
.zip(y.chunks(opts.chunk_size))
.zip(z.chunks_mut(opts.chunk_size))
.collect();
for ((x_chunk, y_chunk), z_chunk) in chunks {
if opts.use_simd && x_chunk.len() >= opts.simd_threshold {
let x_view = ArrayView1::from(x_chunk);
let y_view = ArrayView1::from(y_chunk);
let result = T::simd_sub(&x_view, &y_view);
z_chunk.copy_from_slice(result.as_slice().expect("Operation failed"));
} else {
for ((xi, yi), zi) in x_chunk.iter().zip(y_chunk).zip(z_chunk) {
*zi = *xi - *yi;
}
}
}
} else if opts.use_simd && x.len() >= opts.simd_threshold {
let x_view = ArrayView1::from(x);
let y_view = ArrayView1::from(y);
let result = T::simd_sub(&x_view, &y_view);
z.copy_from_slice(result.as_slice().expect("Operation failed"));
} else {
for ((xi, yi), zi) in x.iter().zip(y).zip(z) {
*zi = *xi - *yi;
}
}
}
#[allow(dead_code)]
pub fn parallel_axpy<T>(a: T, x: &[T], y: &mut [T], options: Option<ParallelVectorOptions>)
where
T: Float + SparseElement + NumAssign + Send + Sync + Copy + SimdUnifiedOps,
{
assert_eq!(x.len(), y.len(), "Vector lengths must be equal for AXPY");
if x.is_empty() {
return;
}
let opts = options.unwrap_or_default();
if opts.use_parallel && x.len() >= opts.parallel_threshold {
let chunks: Vec<_> = x
.chunks(opts.chunk_size)
.zip(y.chunks_mut(opts.chunk_size))
.collect();
for (x_chunk, y_chunk) in chunks {
if opts.use_simd && x_chunk.len() >= opts.simd_threshold {
let x_view = ArrayView1::from(x_chunk);
let scaled = T::simd_scalar_mul(&x_view, a);
let y_view = ArrayView1::from(&y_chunk[..]);
let result = T::simd_add(&scaled.view(), &y_view);
y_chunk.copy_from_slice(result.as_slice().expect("Operation failed"));
} else {
for (xi, yi) in x_chunk.iter().zip(y_chunk) {
*yi = a * *xi + *yi;
}
}
}
} else if opts.use_simd && x.len() >= opts.simd_threshold {
let x_view = ArrayView1::from(x);
let scaled = T::simd_scalar_mul(&x_view, a);
let y_view = ArrayView1::from(&y[..]);
let result = T::simd_add(&scaled.view(), &y_view);
y.copy_from_slice(result.as_slice().expect("Operation failed"));
} else {
for (xi, yi) in x.iter().zip(y) {
*yi = a * *xi + *yi;
}
}
}
#[allow(dead_code)]
pub fn parallel_vector_scale<T>(a: T, x: &[T], y: &mut [T], options: Option<ParallelVectorOptions>)
where
T: Float + SparseElement + NumAssign + Send + Sync + Copy + SimdUnifiedOps,
{
assert_eq!(x.len(), y.len(), "Vector lengths must be equal for scaling");
if x.is_empty() {
return;
}
let opts = options.unwrap_or_default();
if opts.use_parallel && x.len() >= opts.parallel_threshold {
let chunks: Vec<_> = x
.chunks(opts.chunk_size)
.zip(y.chunks_mut(opts.chunk_size))
.collect();
for (x_chunk, y_chunk) in chunks {
if opts.use_simd && x_chunk.len() >= opts.simd_threshold {
let x_view = ArrayView1::from(x_chunk);
let result = T::simd_scalar_mul(&x_view, a);
y_chunk.copy_from_slice(result.as_slice().expect("Operation failed"));
} else {
for (xi, yi) in x_chunk.iter().zip(y_chunk) {
*yi = a * *xi;
}
}
}
} else if opts.use_simd && x.len() >= opts.simd_threshold {
let x_view = ArrayView1::from(x);
let result = T::simd_scalar_mul(&x_view, a);
y.copy_from_slice(result.as_slice().expect("Operation failed"));
} else {
for (xi, yi) in x.iter().zip(y) {
*yi = a * *xi;
}
}
}
#[allow(dead_code)]
pub fn parallel_vector_copy<T>(x: &[T], y: &mut [T], options: Option<ParallelVectorOptions>)
where
T: Float + SparseElement + NumAssign + Send + Sync + Copy,
{
assert_eq!(x.len(), y.len(), "Vector lengths must be equal for copy");
if x.is_empty() {
return;
}
let opts = options.unwrap_or_default();
if opts.use_parallel && x.len() >= opts.parallel_threshold {
let chunks: Vec<_> = x
.chunks(opts.chunk_size)
.zip(y.chunks_mut(opts.chunk_size))
.collect();
for (x_chunk, y_chunk) in chunks {
y_chunk.copy_from_slice(x_chunk);
}
} else {
y.copy_from_slice(x);
}
}
#[allow(dead_code)]
#[allow(clippy::too_many_arguments)]
pub fn parallel_linear_combination<T>(
a: T,
x: &[T],
b: T,
y: &[T],
z: &mut [T],
options: Option<ParallelVectorOptions>,
) where
T: Float + SparseElement + NumAssign + Send + Sync + Copy + SimdUnifiedOps,
{
assert_eq!(x.len(), y.len(), "Input vector lengths must be equal");
assert_eq!(x.len(), z.len(), "Output vector length must match input");
if x.is_empty() {
return;
}
let opts = options.unwrap_or_default();
if opts.use_parallel && x.len() >= opts.parallel_threshold {
let chunks: Vec<_> = x
.chunks(opts.chunk_size)
.zip(y.chunks(opts.chunk_size))
.zip(z.chunks_mut(opts.chunk_size))
.collect();
for ((x_chunk, y_chunk), z_chunk) in chunks {
if opts.use_simd && x_chunk.len() >= opts.simd_threshold {
let x_view = ArrayView1::from(x_chunk);
let y_view = ArrayView1::from(y_chunk);
let ax = T::simd_scalar_mul(&x_view, a);
let by = T::simd_scalar_mul(&y_view, b);
let result = T::simd_add(&ax.view(), &by.view());
z_chunk.copy_from_slice(result.as_slice().expect("Operation failed"));
} else {
for ((xi, yi), zi) in x_chunk.iter().zip(y_chunk).zip(z_chunk) {
*zi = a * *xi + b * *yi;
}
}
}
} else if opts.use_simd && x.len() >= opts.simd_threshold {
let x_view = ArrayView1::from(x);
let y_view = ArrayView1::from(y);
let ax = T::simd_scalar_mul(&x_view, a);
let by = T::simd_scalar_mul(&y_view, b);
let result = T::simd_add(&ax.view(), &by.view());
z.copy_from_slice(result.as_slice().expect("Operation failed"));
} else {
for ((xi, yi), zi) in x.iter().zip(y).zip(z) {
*zi = a * *xi + b * *yi;
}
}
}
#[allow(dead_code)]
#[allow(clippy::too_many_arguments)]
pub fn parallel_sparse_matvec_csr<T>(
y: &mut [T],
rows: usize,
indptr: &[usize],
indices: &[usize],
data: &[T],
x: &[T],
options: Option<ParallelVectorOptions>,
) where
T: Float + SparseElement + NumAssign + Send + Sync + Copy + SimdUnifiedOps,
{
assert_eq!(
y.len(),
rows,
"Output vector length must match number of rows"
);
assert_eq!(indptr.len(), rows + 1, "indptr length must be rows + 1");
assert_eq!(
indices.len(),
data.len(),
"indices and data must have same length"
);
if rows == 0 {
return;
}
let opts = options.unwrap_or_default();
if opts.use_parallel && rows >= opts.parallel_threshold {
let row_chunks: Vec<_> = (0..rows).step_by(opts.chunk_size).collect();
for start_row in row_chunks {
let end_row = (start_row + opts.chunk_size).min(rows);
for row in start_row..end_row {
let start_idx = indptr[row];
let end_idx = indptr[row + 1];
if end_idx > start_idx {
let row_indices = &indices[start_idx..end_idx];
let row_data = &data[start_idx..end_idx];
if opts.use_simd && (end_idx - start_idx) >= opts.simd_threshold {
let mut sum = T::sparse_zero();
let simd_len = (end_idx - start_idx) & !7;
for i in (0..simd_len).step_by(8) {
let data_chunk = &row_data[i..i + 8];
let mut x_values = [T::sparse_zero(); 8];
for (j, &col_idx) in row_indices[i..i + 8].iter().enumerate() {
x_values[j] = x[col_idx];
}
for k in 0..8 {
sum += data_chunk[k] * x_values[k];
}
}
for i in simd_len..(end_idx - start_idx) {
sum += row_data[i] * x[row_indices[i]];
}
y[row] = sum;
} else {
let mut sum = T::sparse_zero();
for (&col_idx, &value) in row_indices.iter().zip(row_data.iter()) {
sum += value * x[col_idx];
}
y[row] = sum;
}
} else {
y[row] = T::sparse_zero();
}
}
}
} else {
for row in 0..rows {
let start_idx = indptr[row];
let end_idx = indptr[row + 1];
let mut sum = T::sparse_zero();
for idx in start_idx..end_idx {
let col = indices[idx];
sum += data[idx] * x[col];
}
y[row] = sum;
}
}
}
#[allow(dead_code)]
#[allow(clippy::too_many_arguments)]
pub fn advanced_sparse_matvec_csr<T>(
y: &mut [T],
rows: usize,
indptr: &[usize],
indices: &[usize],
data: &[T],
x: &[T],
) where
T: Float + SparseElement + NumAssign + Send + Sync + Copy + SimdUnifiedOps,
{
let total_nnz = data.len();
let avg_nnz_per_row = total_nnz.checked_div(rows).unwrap_or(0);
let mut row_nnz_counts = Vec::with_capacity(rows);
for row in 0..rows {
row_nnz_counts.push(indptr[row + 1] - indptr[row]);
}
let _max_nnz_per_row = row_nnz_counts.iter().max().copied().unwrap_or(0);
let sparsity_variance = if rows > 1 {
let mean = avg_nnz_per_row as f64;
let variance: f64 = row_nnz_counts
.iter()
.map(|&x| (x as f64 - mean).powi(2))
.sum::<f64>()
/ (rows - 1) as f64;
variance.sqrt()
} else {
0.0
};
if sparsity_variance > (avg_nnz_per_row as f64) * 2.0 {
advanced_adaptive_load_balanced_spmv(y, rows, indptr, indices, data, x, &row_nnz_counts);
} else if avg_nnz_per_row > 64 {
advanced_vectorized_spmv(y, rows, indptr, indices, data, x);
} else {
advanced_cache_optimized_spmv(y, rows, indptr, indices, data, x);
}
}
#[allow(dead_code)]
fn advanced_adaptive_load_balanced_spmv<T>(
y: &mut [T],
rows: usize,
indptr: &[usize],
indices: &[usize],
data: &[T],
x: &[T],
row_nnz_counts: &[usize],
) where
T: Float + SparseElement + NumAssign + Send + Sync + Copy,
{
let total_nnz = data.len();
let target_nnz_per_chunk = total_nnz / num_cpus::get().max(1);
let mut work_chunks = Vec::new();
let mut current_chunk_start = 0;
let mut current_chunk_nnz = 0;
for (row, &nnz) in row_nnz_counts.iter().enumerate() {
current_chunk_nnz += nnz;
if current_chunk_nnz >= target_nnz_per_chunk || row == rows - 1 {
work_chunks.push((current_chunk_start, row + 1));
current_chunk_start = row + 1;
current_chunk_nnz = 0;
}
}
parallel_map(&work_chunks, |(start_row, end_row)| {
for row in *start_row..*end_row {
let start_idx = indptr[row];
let end_idx = indptr[row + 1];
let mut sum = T::sparse_zero();
for idx in start_idx..end_idx {
sum += data[idx] * x[indices[idx]];
}
}
(*start_row, *end_row) });
for row in 0..rows {
let start_idx = indptr[row];
let end_idx = indptr[row + 1];
let mut sum = T::sparse_zero();
for idx in start_idx..end_idx {
sum += data[idx] * x[indices[idx]];
}
y[row] = sum;
}
}
#[allow(dead_code)]
fn advanced_vectorized_spmv<T>(
y: &mut [T],
rows: usize,
indptr: &[usize],
indices: &[usize],
data: &[T],
x: &[T],
) where
T: Float + SparseElement + NumAssign + Send + Sync + Copy + SimdUnifiedOps,
{
for row in 0..rows {
let start_idx = indptr[row];
let end_idx = indptr[row + 1];
let nnz = end_idx - start_idx;
if nnz == 0 {
y[row] = T::sparse_zero();
continue;
}
let row_data = &data[start_idx..end_idx];
let row_indices = &indices[start_idx..end_idx];
if nnz >= 8 {
let mut sum = T::sparse_zero();
let simd_iterations = nnz / 8;
let _remainder = nnz % 8;
for chunk in 0..simd_iterations {
let base_idx = chunk * 8;
let mut chunk_sum = T::sparse_zero();
chunk_sum += row_data[base_idx] * x[row_indices[base_idx]];
chunk_sum += row_data[base_idx + 1] * x[row_indices[base_idx + 1]];
chunk_sum += row_data[base_idx + 2] * x[row_indices[base_idx + 2]];
chunk_sum += row_data[base_idx + 3] * x[row_indices[base_idx + 3]];
chunk_sum += row_data[base_idx + 4] * x[row_indices[base_idx + 4]];
chunk_sum += row_data[base_idx + 5] * x[row_indices[base_idx + 5]];
chunk_sum += row_data[base_idx + 6] * x[row_indices[base_idx + 6]];
chunk_sum += row_data[base_idx + 7] * x[row_indices[base_idx + 7]];
sum += chunk_sum;
}
for i in (simd_iterations * 8)..nnz {
sum += row_data[i] * x[row_indices[i]];
}
y[row] = sum;
} else {
let mut sum = T::sparse_zero();
for (i, &col_idx) in row_indices.iter().enumerate() {
sum += row_data[i] * x[col_idx];
}
y[row] = sum;
}
}
}
#[allow(dead_code)]
fn advanced_cache_optimized_spmv<T>(
y: &mut [T],
rows: usize,
indptr: &[usize],
indices: &[usize],
data: &[T],
x: &[T],
) where
T: Float + SparseElement + NumAssign + Send + Sync + Copy,
{
const CACHE_BLOCK_SIZE: usize = 64;
for row_block_start in (0..rows).step_by(CACHE_BLOCK_SIZE) {
let row_block_end = (row_block_start + CACHE_BLOCK_SIZE).min(rows);
for row in row_block_start..row_block_end {
let start_idx = indptr[row];
let end_idx = indptr[row + 1];
let mut sum = T::sparse_zero();
for idx in start_idx..end_idx {
let col = indices[idx];
sum += data[idx] * x[col];
}
y[row] = sum;
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use approx::assert_relative_eq;
#[test]
fn test_parallel_dot() {
let x = vec![1.0, 2.0, 3.0, 4.0];
let y = vec![2.0, 3.0, 4.0, 5.0];
let result = parallel_dot(&x, &y, None);
let expected = 1.0 * 2.0 + 2.0 * 3.0 + 3.0 * 4.0 + 4.0 * 5.0;
assert_relative_eq!(result, expected);
}
#[test]
fn test_parallel_norm2() {
let x = vec![3.0, 4.0];
let result = parallel_norm2(&x, None);
assert_relative_eq!(result, 5.0);
}
#[test]
fn test_parallel_vector_add() {
let x = vec![1.0, 2.0, 3.0];
let y = vec![4.0, 5.0, 6.0];
let mut z = vec![0.0; 3];
parallel_vector_add(&x, &y, &mut z, None);
assert_relative_eq!(z[0], 5.0);
assert_relative_eq!(z[1], 7.0);
assert_relative_eq!(z[2], 9.0);
}
#[test]
fn test_parallel_axpy() {
let a = 2.0;
let x = vec![1.0, 2.0, 3.0];
let mut y = vec![1.0, 1.0, 1.0];
parallel_axpy(a, &x, &mut y, None);
assert_relative_eq!(y[0], 3.0);
assert_relative_eq!(y[1], 5.0);
assert_relative_eq!(y[2], 7.0);
}
#[test]
fn test_parallel_vector_scale() {
let a = 3.0;
let x = vec![1.0, 2.0, 3.0];
let mut y = vec![0.0; 3];
parallel_vector_scale(a, &x, &mut y, None);
assert_relative_eq!(y[0], 3.0);
assert_relative_eq!(y[1], 6.0);
assert_relative_eq!(y[2], 9.0);
}
#[test]
fn test_parallel_linear_combination() {
let a = 2.0;
let x = vec![1.0, 2.0];
let b = 3.0;
let y = vec![1.0, 1.0];
let mut z = vec![0.0; 2];
parallel_linear_combination(a, &x, b, &y, &mut z, None);
assert_relative_eq!(z[0], 5.0);
assert_relative_eq!(z[1], 7.0);
}
#[test]
fn test_empty_vectors() {
let x: Vec<f64> = vec![];
let y: Vec<f64> = vec![];
assert_eq!(parallel_dot(&x, &y, None), 0.0);
assert_eq!(parallel_norm2(&x, None), 0.0);
}
#[test]
fn test_large_vectors_trigger_parallel() {
let opts = ParallelVectorOptions {
parallel_threshold: 100,
..Default::default()
};
let x: Vec<f64> = (0..1000).map(|i| i as f64).collect();
let y: Vec<f64> = (0..1000).map(|i| (i + 1) as f64).collect();
let result = parallel_dot(&x, &y, Some(opts));
let expected: f64 = (0..1000).map(|i| (i * (i + 1)) as f64).sum();
assert_relative_eq!(result, expected, epsilon = 1e-10);
}
}