veloxx 0.4.0

Veloxx: High-performance, lightweight Rust library for in-memory data processing and analytics. Features DataFrames, Series, advanced I/O (CSV, JSON, Parquet), machine learning (linear regression, K-means, logistic regression), time-series analysis, data visualization, parallel processing, and multi-platform bindings (Python, WebAssembly). Designed for minimal dependencies, optimal memory usage, and blazing speed - ideal for data science, analytics, and performance-critical applications.
Documentation
#[cfg(all(feature = "simd", not(target_arch = "wasm32")))]
use crate::performance::optimized_simd::OptimizedSimdOps;
use crate::VeloxxError;
use rayon::prelude::*;
use std::sync::Arc;

/// Advanced parallel executor with work-stealing and NUMA awareness
pub struct AdvancedParallelExecutor {
    /// Number of worker threads
    thread_count: usize,
    /// Chunk size for work distribution
    chunk_size: usize,
    /// Work-stealing queue depth
    _queue_depth: usize,
}

impl AdvancedParallelExecutor {
    /// Create a new advanced parallel executor
    pub fn new() -> Self {
        let thread_count = rayon::current_num_threads();
        Self {
            thread_count,
            chunk_size: 8192, // Optimized for cache locality
            _queue_depth: thread_count * 4,
        }
    }

    /// Create with specific configuration
    pub fn with_config(thread_count: usize, chunk_size: usize) -> Self {
        Self {
            thread_count,
            chunk_size,
            _queue_depth: thread_count * 4,
        }
    }

    /// Execute SIMD addition with work-stealing parallelization
    pub fn parallel_simd_add(
        &self,
        a: &[f64],
        b: &[f64],
        result: &mut [f64],
    ) -> Result<(), VeloxxError> {
        assert_eq!(a.len(), b.len());
        assert_eq!(a.len(), result.len());

        let len = a.len();

        #[cfg(all(feature = "simd", not(target_arch = "wasm32")))]
        {
            // Use threshold to decide between parallel and sequential execution
            if len < self.chunk_size * 2 {
                // For small arrays, sequential SIMD is faster due to overhead
                a.optimized_simd_add(b, result);
                return Ok(());
            }

            // Advanced parallel execution with work-stealing
            let chunk_size = self.chunk_size;

            // Use rayon's parallel chunks for safe parallel mutation
            result
                .par_chunks_mut(chunk_size)
                .zip(a.par_chunks(chunk_size))
                .zip(b.par_chunks(chunk_size))
                .try_for_each(
                    |((chunk_result, chunk_a), chunk_b)| -> Result<(), VeloxxError> {
                        chunk_a.optimized_simd_add(chunk_b, chunk_result);
                        Ok(())
                    },
                )?;
        }
        #[cfg(not(all(feature = "simd", not(target_arch = "wasm32"))))]
        {
            // Fallback implementation for WASM or when SIMD is not available
            for i in 0..len {
                result[i] = a[i] + b[i];
            }
        }

        Ok(())
    }

    /// Execute SIMD multiplication with advanced parallelization
    pub fn parallel_simd_mul(
        &self,
        a: &[f64],
        b: &[f64],
        result: &mut [f64],
    ) -> Result<(), VeloxxError> {
        assert_eq!(a.len(), b.len());
        assert_eq!(a.len(), result.len());

        let len = a.len();

        #[cfg(all(feature = "simd", not(target_arch = "wasm32")))]
        {
            if len < self.chunk_size * 2 {
                a.optimized_simd_mul(b, result);
                return Ok(());
            }

            // Use rayon's parallel chunks for safe parallel mutation
            result
                .par_chunks_mut(self.chunk_size)
                .zip(a.par_chunks(self.chunk_size))
                .zip(b.par_chunks(self.chunk_size))
                .try_for_each(
                    |((chunk_result, chunk_a), chunk_b)| -> Result<(), VeloxxError> {
                        chunk_a.optimized_simd_mul(chunk_b, chunk_result);
                        Ok(())
                    },
                )?;
        }
        #[cfg(not(all(feature = "simd", not(target_arch = "wasm32"))))]
        {
            // Fallback implementation for WASM or when SIMD is not available
            for i in 0..len {
                result[i] = a[i] * b[i];
            }
        }

        Ok(())
    }

    /// Parallel SIMD sum with tree reduction
    pub fn parallel_simd_sum(&self, data: &[f64]) -> Result<f64, VeloxxError> {
        let len = data.len();

        #[cfg(all(feature = "simd", not(target_arch = "wasm32")))]
        {
            if len < self.chunk_size * 2 {
                return Ok(data.optimized_simd_sum());
            }

            let chunk_size = self.chunk_size;
            let num_chunks = len.div_ceil(chunk_size);

            // Parallel sum with tree reduction
            let partial_sums: Result<Vec<f64>, VeloxxError> = (0..num_chunks)
                .into_par_iter()
                .map(|chunk_idx| -> Result<f64, VeloxxError> {
                    let start = chunk_idx * chunk_size;
                    let end = std::cmp::min(start + chunk_size, len);
                    let chunk = &data[start..end];
                    Ok(chunk.optimized_simd_sum())
                })
                .collect();

            let sums = partial_sums?;

            // Final reduction (sequential for small number of partial sums)
            Ok(sums.iter().sum())
        }
        #[cfg(not(all(feature = "simd", not(target_arch = "wasm32"))))]
        {
            // Fallback implementation for WASM or when SIMD is not available
            Ok(data.iter().sum())
        }
    }

    /// Advanced fused operation: (a + b) * c with single-pass parallelization
    pub fn parallel_fused_add_mul(
        &self,
        a: &[f64],
        b: &[f64],
        c: &[f64],
        result: &mut [f64],
    ) -> Result<(), VeloxxError> {
        assert_eq!(a.len(), b.len());
        assert_eq!(a.len(), c.len());
        assert_eq!(a.len(), result.len());

        let len = a.len();

        if len < self.chunk_size * 2 {
            // Sequential fused operation
            self.sequential_fused_add_mul(a, b, c, result)?;
            return Ok(());
        }

        // Use rayon's parallel chunks for safe parallel mutation
        result
            .par_chunks_mut(self.chunk_size)
            .zip(a.par_chunks(self.chunk_size))
            .zip(b.par_chunks(self.chunk_size))
            .zip(c.par_chunks(self.chunk_size))
            .try_for_each(
                |(((chunk_result, chunk_a), chunk_b), chunk_c)| -> Result<(), VeloxxError> {
                    self.sequential_fused_add_mul(chunk_a, chunk_b, chunk_c, chunk_result)?;
                    Ok(())
                },
            )?;

        Ok(())
    }

    /// Sequential fused add-multiply operation with SIMD
    fn sequential_fused_add_mul(
        &self,
        a: &[f64],
        b: &[f64],
        c: &[f64],
        result: &mut [f64],
    ) -> Result<(), VeloxxError> {
        let len = a.len();

        #[cfg(all(feature = "simd", not(target_arch = "wasm32")))]
        {
            use wide::f64x4;

            let simd_len = len / 4;
            let remainder = len % 4;

            // SIMD fused operation: (a + b) * c
            for i in 0..simd_len {
                let a_vec = f64x4::from([a[i * 4], a[i * 4 + 1], a[i * 4 + 2], a[i * 4 + 3]]);
                let b_vec = f64x4::from([b[i * 4], b[i * 4 + 1], b[i * 4 + 2], b[i * 4 + 3]]);
                let c_vec = f64x4::from([c[i * 4], c[i * 4 + 1], c[i * 4 + 2], c[i * 4 + 3]]);

                let result_vec = (a_vec + b_vec) * c_vec;
                let result_array = result_vec.to_array();

                result[i * 4] = result_array[0];
                result[i * 4 + 1] = result_array[1];
                result[i * 4 + 2] = result_array[2];
                result[i * 4 + 3] = result_array[3];
            }

            // Handle remainder
            for i in (len - remainder)..len {
                result[i] = (a[i] + b[i]) * c[i];
            }
        }
        #[cfg(not(all(feature = "simd", not(target_arch = "wasm32"))))]
        {
            // Fallback implementation for WASM or when SIMD is not available
            for i in 0..len {
                result[i] = (a[i] + b[i]) * c[i];
            }
        }

        Ok(())
    }

    /// Adaptive chunk size based on data size and system characteristics
    pub fn adaptive_chunk_size(&self, data_size: usize) -> usize {
        let base_chunk = self.chunk_size;
        let thread_count = self.thread_count;

        // Adaptive sizing based on data size and thread count
        if data_size < base_chunk * thread_count {
            // Small data: fewer, larger chunks to reduce overhead
            std::cmp::max(data_size / thread_count, 1024)
        } else if data_size > base_chunk * thread_count * 8 {
            // Large data: more, smaller chunks for better load balancing
            base_chunk / 2
        } else {
            base_chunk
        }
    }

    /// NUMA-aware memory allocation and processing
    pub fn numa_aware_process<F>(&self, data_size: usize, processor: F) -> Result<(), VeloxxError>
    where
        F: Fn(usize, usize) -> Result<(), VeloxxError> + Sync + Send,
    {
        let chunk_size = self.adaptive_chunk_size(data_size);
        let num_chunks = data_size.div_ceil(chunk_size);

        // NUMA-aware parallel processing
        (0..num_chunks)
            .into_par_iter()
            .try_for_each(|chunk_idx| -> Result<(), VeloxxError> {
                let start = chunk_idx * chunk_size;
                let end = std::cmp::min(start + chunk_size, data_size);

                // Process chunk on current NUMA node
                processor(start, end)
            })?;

        Ok(())
    }
}

impl Default for AdvancedParallelExecutor {
    fn default() -> Self {
        Self::new()
    }
}

/// Global advanced parallel executor instance
static GLOBAL_EXECUTOR: std::sync::OnceLock<Arc<AdvancedParallelExecutor>> =
    std::sync::OnceLock::new();

/// Get the global advanced parallel executor
pub fn global_executor() -> &'static Arc<AdvancedParallelExecutor> {
    GLOBAL_EXECUTOR.get_or_init(|| Arc::new(AdvancedParallelExecutor::new()))
}

/// High-level parallel SIMD operations using the global executor
pub fn parallel_simd_add_advanced(
    a: &[f64],
    b: &[f64],
    result: &mut [f64],
) -> Result<(), VeloxxError> {
    global_executor().parallel_simd_add(a, b, result)
}

pub fn parallel_simd_mul_advanced(
    a: &[f64],
    b: &[f64],
    result: &mut [f64],
) -> Result<(), VeloxxError> {
    global_executor().parallel_simd_mul(a, b, result)
}

pub fn parallel_simd_sum_advanced(data: &[f64]) -> Result<f64, VeloxxError> {
    global_executor().parallel_simd_sum(data)
}

pub fn parallel_fused_add_mul_advanced(
    a: &[f64],
    b: &[f64],
    c: &[f64],
    result: &mut [f64],
) -> Result<(), VeloxxError> {
    global_executor().parallel_fused_add_mul(a, b, c, result)
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_parallel_simd_add() {
        let executor = AdvancedParallelExecutor::new();
        let size = 10000;
        let a: Vec<f64> = (0..size).map(|i| i as f64).collect();
        let b: Vec<f64> = (0..size).map(|i| (i * 2) as f64).collect();
        let mut result = vec![0.0; size];

        executor.parallel_simd_add(&a, &b, &mut result).unwrap();

        for i in 0..size {
            assert_eq!(result[i], a[i] + b[i]);
        }
    }

    #[test]
    fn test_parallel_fused_add_mul() {
        let executor = AdvancedParallelExecutor::new();
        let size = 8192;
        let a: Vec<f64> = (0..size).map(|i| i as f64).collect();
        let b: Vec<f64> = (0..size).map(|i| (i + 1) as f64).collect();
        let c: Vec<f64> = (0..size).map(|_| 2.0).collect();
        let mut result = vec![0.0; size];

        executor
            .parallel_fused_add_mul(&a, &b, &c, &mut result)
            .unwrap();

        for i in 0..size {
            let expected = (a[i] + b[i]) * c[i];
            assert_eq!(result[i], expected);
        }
    }

    #[test]
    fn test_adaptive_chunk_size() {
        let executor = AdvancedParallelExecutor::new();

        // Small data should get larger chunks
        assert!(executor.adaptive_chunk_size(1000) >= 1024);

        // Medium data should get default chunks
        let medium_size = executor.chunk_size * executor.thread_count * 4;
        assert_eq!(
            executor.adaptive_chunk_size(medium_size),
            executor.chunk_size
        );

        // Large data should get smaller chunks
        let large_size = executor.chunk_size * executor.thread_count * 16;
        assert!(executor.adaptive_chunk_size(large_size) < executor.chunk_size);
    }
}