Skip to main content

scirs2_interpolate/parallel/
mod.rs

1//! Parallel processing utilities for interpolation
2//!
3//! This module provides parallel implementations of computationally intensive
4//! interpolation operations. It enables efficient utilization of multi-core
5//! processors to accelerate interpolation tasks.
6//!
7//! The module includes:
8//!
9//! - Parallel versions of local interpolation methods
10//!   - `ParallelMovingLeastSquares`: Parallel implementation of MLS interpolation
11//!   - `ParallelLocalPolynomialRegression`: Parallel implementation of LOESS
12//! - Thread pool management for parallel operations
13//! - Utility functions for work distribution and threading
14//!
15//! Most parallel implementations in this module use Rayon, which provides
16//! work-stealing thread pools and parallel iterators.
17//!
18//! # Examples
19//!
20//! ```
21//! use scirs2_core::ndarray::{Array1, Array2, ArrayView2};
22//! use scirs2_interpolate::parallel::{
23//!     ParallelMovingLeastSquares, ParallelConfig,
24//!     ParallelEvaluate
25//! };
26//! use scirs2_interpolate::local::mls::{WeightFunction, PolynomialBasis};
27//!
28//! // Create sample data
29//! let points = Array2::from_shape_vec((5, 2), vec![
30//!     0.0, 0.0,
31//!     1.0, 0.0,
32//!     0.0, 1.0,
33//!     1.0, 1.0,
34//!     0.5, 0.5,
35//! ]).expect("doc example: should succeed");
36//! let values = Array1::from_vec(vec![0.0, 1.0, 1.0, 2.0, 1.5]);
37//!
38//! // Create parallel MLS interpolator
39//! let parallel_mls = ParallelMovingLeastSquares::new(
40//!     points.clone(),
41//!     values.clone(),
42//!     WeightFunction::Gaussian,
43//!     PolynomialBasis::Linear,
44//!     0.5, // bandwidth
45//! ).expect("doc example: should succeed");
46//!
47//! // Evaluate at multiple points in parallel
48//! let query_points = Array2::from_shape_vec((2, 2), vec![
49//!     0.25, 0.25,
50//!     0.75, 0.75,
51//! ]).expect("doc example: should succeed");
52//!
53//! let config = ParallelConfig::new();
54//! let results: Array1<f64> = parallel_mls.evaluate_parallel(&query_points.view(), &config).expect("doc example: should succeed");
55//! ```
56
57use scirs2_core::ndarray::{Array1, ArrayView2};
58use scirs2_core::numeric::Float;
59use scirs2_core::parallel_ops::*;
60use std::fmt::Debug;
61
62use crate::error::InterpolateResult;
63
64/// Configuration for parallel execution
65#[derive(Debug, Clone, Copy, Default)]
66pub struct ParallelConfig {
67    /// Number of worker threads to use
68    /// If None, uses Rayon's default (usually the number of logical CPUs)
69    pub n_workers: Option<usize>,
70
71    /// Chunk size for parallel iterators
72    /// If None, Rayon chooses automatically
73    pub chunk_size: Option<usize>,
74}
75
76impl ParallelConfig {
77    /// Create a new ParallelConfig with default settings
78    pub fn new() -> Self {
79        Self::default()
80    }
81
82    /// Set the number of worker threads
83    pub fn with_workers(mut self, nworkers: usize) -> Self {
84        self.n_workers = Some(nworkers);
85        self
86    }
87
88    /// Set the chunk size for parallel iterators
89    pub fn with_chunk_size(mut self, chunksize: usize) -> Self {
90        self.chunk_size = Some(chunksize);
91        self
92    }
93
94    /// Thread pool initialization is now handled globally by scirs2-core
95    /// This method is kept for compatibility but no longer creates a new pool
96    pub fn init_thread_pool(&self) -> InterpolateResult<()> {
97        // Thread pool configuration is now handled globally by scirs2-core
98        // The n_workers parameter is preserved for future use but currently ignored
99        Ok(())
100    }
101
102    /// Get the chunk size to use for a given total size
103    pub fn get_chunk_size(&self, totalsize: usize) -> usize {
104        match self.chunk_size {
105            Some(_size) => _size,
106            None => {
107                // Choose a reasonable chunk _size based on total _size
108                // This is a heuristic and might need tuning for different workloads
109                let n_cpus = num_cpus::get();
110                let min_chunks_per_cpu = 4; // Ensure at least 4 chunks per CPU for load balancing
111
112                std::cmp::max(1, totalsize / (n_cpus * min_chunks_per_cpu))
113            }
114        }
115    }
116}
117
118/// Trait for functions that can be parallelized
119pub trait Parallelizable<T, R> {
120    /// Execute the function in parallel on a batch of inputs
121    fn execute_parallel(&self, inputs: &[T], config: &ParallelConfig) -> Vec<R>;
122}
123
124/// Implementation for any function that can be applied to a single input
125impl<T, R, F> Parallelizable<T, R> for F
126where
127    F: Fn(&T) -> R + Sync,
128    T: Sync,
129    R: Send,
130{
131    fn execute_parallel(&self, inputs: &[T], config: &ParallelConfig) -> Vec<R> {
132        let chunk_size = config.get_chunk_size(inputs.len());
133
134        inputs
135            .par_iter()
136            .with_min_len(chunk_size)
137            .map(self)
138            .collect()
139    }
140}
141
142/// Trait for types that support parallel evaluation
143pub trait ParallelEvaluate<F: Float, O> {
144    /// Evaluate at multiple points in parallel
145    fn evaluate_parallel(
146        &self,
147        points: &ArrayView2<F>,
148        config: &ParallelConfig,
149    ) -> InterpolateResult<O>;
150}
151
152/// Trait for interpolators that can make batch predictions in parallel
153pub trait ParallelPredict<F: Float> {
154    /// Make predictions at multiple points in parallel
155    fn predict_parallel(
156        &self,
157        points: &ArrayView2<F>,
158        config: &ParallelConfig,
159    ) -> InterpolateResult<Array1<F>>;
160}
161
162/// Helper function to estimate optimal chunk size
163///
164/// This function determines a reasonable chunk size for parallel processing
165/// based on the total size and the computational cost of each item.
166///
167/// # Arguments
168///
169/// * `total_size` - Total number of items to process
170/// * `costfactor` - Relative computational cost per item (higher = more expensive)
171/// * `config` - Parallel configuration
172///
173/// # Returns
174///
175/// The recommended chunk size
176#[allow(dead_code)]
177pub fn estimate_chunk_size(total_size: usize, costfactor: f64, config: &ParallelConfig) -> usize {
178    // If chunk _size is explicitly specified, use that
179    if let Some(size) = config.chunk_size {
180        return size;
181    }
182
183    // Otherwise, compute a reasonable chunk _size
184    let n_cpus = match config.n_workers {
185        Some(n) => n,
186        None => num_cpus::get(),
187    };
188
189    // Base estimate on desired chunks per CPU
190    let desired_chunks_per_cpu = if costfactor > 10.0 {
191        // Expensive operations - fewer, larger chunks
192        2
193    } else if costfactor > 1.0 {
194        // Moderately expensive - balanced
195        4
196    } else {
197        // Cheap operations - more, smaller chunks for better load balancing
198        8
199    };
200
201    let base_chunk_size = std::cmp::max(1, total_size / (n_cpus * desired_chunks_per_cpu));
202
203    // Apply cost _factor adjustment
204    let adjusted_size = (base_chunk_size as f64 * costfactor.sqrt()).ceil() as usize;
205
206    // Ensure a reasonable bound
207    std::cmp::min(total_size, std::cmp::max(1, adjusted_size))
208}
209
210/// Create evenly distributed indices for parallel tasks
211///
212/// This function partitions a range of indices into approximately
213/// equal-sized chunks for distribution across worker threads.
214///
215/// # Arguments
216///
217/// * `total_size` - Total number of items
218/// * `n_parts` - Number of partitions to create
219///
220/// # Returns
221///
222/// Vector of (start, end) index pairs for each partition
223#[allow(dead_code)]
224pub fn create_index_ranges(total_size: usize, nparts: usize) -> Vec<(usize, usize)> {
225    if total_size == 0 || nparts == 0 {
226        return Vec::new();
227    }
228
229    let n_parts = std::cmp::min(nparts, total_size);
230    let mut ranges = Vec::with_capacity(n_parts);
231
232    let chunk_size = total_size / n_parts;
233    let remainder = total_size % n_parts;
234
235    let mut start = 0;
236
237    for i in 0..n_parts {
238        let extra = if i < remainder { 1 } else { 0 };
239        let end = start + chunk_size + extra;
240
241        ranges.push((start, end));
242        start = end;
243    }
244
245    ranges
246}
247
248pub mod loess;
249pub mod mls;
250
251pub use loess::{
252    make_parallel_loess, make_parallel_robust_loess, ParallelLocalPolynomialRegression,
253};
254pub use mls::{make_parallel_mls, ParallelMovingLeastSquares};
255
256#[cfg(test)]
257mod tests {
258    use super::*;
259
260    #[test]
261    fn test_parallel_execution() {
262        let numbers = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
263        let config = ParallelConfig::new();
264
265        // Define a function to square a number
266        let square = |x: &i32| x * x;
267
268        // Execute in parallel
269        let result = square.execute_parallel(&numbers, &config);
270
271        // Check results
272        assert_eq!(result, vec![1, 4, 9, 16, 25, 36, 49, 64, 81, 100]);
273    }
274
275    #[test]
276    fn test_index_ranges() {
277        // Test with evenly divisible size
278        let ranges = create_index_ranges(10, 5);
279        assert_eq!(ranges, vec![(0, 2), (2, 4), (4, 6), (6, 8), (8, 10)]);
280
281        // Test with remainder
282        let ranges = create_index_ranges(11, 3);
283        assert_eq!(ranges, vec![(0, 4), (4, 8), (8, 11)]);
284
285        // Test with more parts than items
286        let ranges = create_index_ranges(3, 5);
287        assert_eq!(ranges, vec![(0, 1), (1, 2), (2, 3)]);
288    }
289
290    #[test]
291    fn test_chunk_size_estimation() {
292        let config = ParallelConfig::new();
293
294        // Test with different cost factors
295        let size_cheap = estimate_chunk_size(1000, 0.5, &config);
296        let size_moderate = estimate_chunk_size(1000, 5.0, &config);
297        let size_expensive = estimate_chunk_size(1000, 20.0, &config);
298
299        // Expensive operations should have larger chunks
300        assert!(size_expensive >= size_moderate);
301        assert!(size_moderate >= size_cheap);
302
303        // Test with explicit chunk size
304        let config_explicit = ParallelConfig::new().with_chunk_size(42);
305        let size = estimate_chunk_size(1000, 1.0, &config_explicit);
306        assert_eq!(size, 42);
307    }
308}