use scirs2_core::ndarray::{Array1, ArrayView2};
use scirs2_core::numeric::Float;
use scirs2_core::parallel_ops::*;
use std::fmt::Debug;
use crate::error::InterpolateResult;
#[derive(Debug, Clone, Copy, Default)]
pub struct ParallelConfig {
pub n_workers: Option<usize>,
pub chunk_size: Option<usize>,
}
impl ParallelConfig {
pub fn new() -> Self {
Self::default()
}
pub fn with_workers(mut self, nworkers: usize) -> Self {
self.n_workers = Some(nworkers);
self
}
pub fn with_chunk_size(mut self, chunksize: usize) -> Self {
self.chunk_size = Some(chunksize);
self
}
pub fn init_thread_pool(&self) -> InterpolateResult<()> {
Ok(())
}
pub fn get_chunk_size(&self, totalsize: usize) -> usize {
match self.chunk_size {
Some(_size) => _size,
None => {
let n_cpus = num_cpus::get();
let min_chunks_per_cpu = 4;
std::cmp::max(1, totalsize / (n_cpus * min_chunks_per_cpu))
}
}
}
}
pub trait Parallelizable<T, R> {
fn execute_parallel(&self, inputs: &[T], config: &ParallelConfig) -> Vec<R>;
}
impl<T, R, F> Parallelizable<T, R> for F
where
F: Fn(&T) -> R + Sync,
T: Sync,
R: Send,
{
fn execute_parallel(&self, inputs: &[T], config: &ParallelConfig) -> Vec<R> {
let chunk_size = config.get_chunk_size(inputs.len());
inputs
.par_iter()
.with_min_len(chunk_size)
.map(self)
.collect()
}
}
pub trait ParallelEvaluate<F: Float, O> {
fn evaluate_parallel(
&self,
points: &ArrayView2<F>,
config: &ParallelConfig,
) -> InterpolateResult<O>;
}
pub trait ParallelPredict<F: Float> {
fn predict_parallel(
&self,
points: &ArrayView2<F>,
config: &ParallelConfig,
) -> InterpolateResult<Array1<F>>;
}
#[allow(dead_code)]
pub fn estimate_chunk_size(total_size: usize, costfactor: f64, config: &ParallelConfig) -> usize {
if let Some(size) = config.chunk_size {
return size;
}
let n_cpus = match config.n_workers {
Some(n) => n,
None => num_cpus::get(),
};
let desired_chunks_per_cpu = if costfactor > 10.0 {
2
} else if costfactor > 1.0 {
4
} else {
8
};
let base_chunk_size = std::cmp::max(1, total_size / (n_cpus * desired_chunks_per_cpu));
let adjusted_size = (base_chunk_size as f64 * costfactor.sqrt()).ceil() as usize;
std::cmp::min(total_size, std::cmp::max(1, adjusted_size))
}
#[allow(dead_code)]
pub fn create_index_ranges(total_size: usize, nparts: usize) -> Vec<(usize, usize)> {
if total_size == 0 || nparts == 0 {
return Vec::new();
}
let n_parts = std::cmp::min(nparts, total_size);
let mut ranges = Vec::with_capacity(n_parts);
let chunk_size = total_size / n_parts;
let remainder = total_size % n_parts;
let mut start = 0;
for i in 0..n_parts {
let extra = if i < remainder { 1 } else { 0 };
let end = start + chunk_size + extra;
ranges.push((start, end));
start = end;
}
ranges
}
pub mod loess;
pub mod mls;
pub use loess::{
make_parallel_loess, make_parallel_robust_loess, ParallelLocalPolynomialRegression,
};
pub use mls::{make_parallel_mls, ParallelMovingLeastSquares};
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parallel_execution() {
let numbers = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
let config = ParallelConfig::new();
let square = |x: &i32| x * x;
let result = square.execute_parallel(&numbers, &config);
assert_eq!(result, vec![1, 4, 9, 16, 25, 36, 49, 64, 81, 100]);
}
#[test]
fn test_index_ranges() {
let ranges = create_index_ranges(10, 5);
assert_eq!(ranges, vec![(0, 2), (2, 4), (4, 6), (6, 8), (8, 10)]);
let ranges = create_index_ranges(11, 3);
assert_eq!(ranges, vec![(0, 4), (4, 8), (8, 11)]);
let ranges = create_index_ranges(3, 5);
assert_eq!(ranges, vec![(0, 1), (1, 2), (2, 3)]);
}
#[test]
fn test_chunk_size_estimation() {
let config = ParallelConfig::new();
let size_cheap = estimate_chunk_size(1000, 0.5, &config);
let size_moderate = estimate_chunk_size(1000, 5.0, &config);
let size_expensive = estimate_chunk_size(1000, 20.0, &config);
assert!(size_expensive >= size_moderate);
assert!(size_moderate >= size_cheap);
let config_explicit = ParallelConfig::new().with_chunk_size(42);
let size = estimate_chunk_size(1000, 1.0, &config_explicit);
assert_eq!(size, 42);
}
}