scirs2_interpolate/parallel/mod.rs
1//! Parallel processing utilities for interpolation
2//!
3//! This module provides parallel implementations of computationally intensive
4//! interpolation operations. It enables efficient utilization of multi-core
5//! processors to accelerate interpolation tasks.
6//!
7//! The module includes:
8//!
9//! - Parallel versions of local interpolation methods
10//! - `ParallelMovingLeastSquares`: Parallel implementation of MLS interpolation
11//! - `ParallelLocalPolynomialRegression`: Parallel implementation of LOESS
12//! - Thread pool management for parallel operations
13//! - Utility functions for work distribution and threading
14//!
15//! Most parallel implementations in this module use Rayon, which provides
16//! work-stealing thread pools and parallel iterators.
17//!
18//! # Examples
19//!
20//! ```
21//! use scirs2_core::ndarray::{Array1, Array2, ArrayView2};
22//! use scirs2_interpolate::parallel::{
23//! ParallelMovingLeastSquares, ParallelConfig,
24//! ParallelEvaluate
25//! };
26//! use scirs2_interpolate::local::mls::{WeightFunction, PolynomialBasis};
27//!
28//! // Create sample data
29//! let points = Array2::from_shape_vec((5, 2), vec![
30//! 0.0, 0.0,
31//! 1.0, 0.0,
32//! 0.0, 1.0,
33//! 1.0, 1.0,
34//! 0.5, 0.5,
35//! ]).expect("doc example: should succeed");
36//! let values = Array1::from_vec(vec![0.0, 1.0, 1.0, 2.0, 1.5]);
37//!
38//! // Create parallel MLS interpolator
39//! let parallel_mls = ParallelMovingLeastSquares::new(
40//! points.clone(),
41//! values.clone(),
42//! WeightFunction::Gaussian,
43//! PolynomialBasis::Linear,
44//! 0.5, // bandwidth
45//! ).expect("doc example: should succeed");
46//!
47//! // Evaluate at multiple points in parallel
48//! let query_points = Array2::from_shape_vec((2, 2), vec![
49//! 0.25, 0.25,
50//! 0.75, 0.75,
51//! ]).expect("doc example: should succeed");
52//!
53//! let config = ParallelConfig::new();
54//! let results: Array1<f64> = parallel_mls.evaluate_parallel(&query_points.view(), &config).expect("doc example: should succeed");
55//! ```
56
57use scirs2_core::ndarray::{Array1, ArrayView2};
58use scirs2_core::numeric::Float;
59use scirs2_core::parallel_ops::*;
60use std::fmt::Debug;
61
62use crate::error::InterpolateResult;
63
64/// Configuration for parallel execution
65#[derive(Debug, Clone, Copy, Default)]
66pub struct ParallelConfig {
67 /// Number of worker threads to use
68 /// If None, uses Rayon's default (usually the number of logical CPUs)
69 pub n_workers: Option<usize>,
70
71 /// Chunk size for parallel iterators
72 /// If None, Rayon chooses automatically
73 pub chunk_size: Option<usize>,
74}
75
76impl ParallelConfig {
77 /// Create a new ParallelConfig with default settings
78 pub fn new() -> Self {
79 Self::default()
80 }
81
82 /// Set the number of worker threads
83 pub fn with_workers(mut self, nworkers: usize) -> Self {
84 self.n_workers = Some(nworkers);
85 self
86 }
87
88 /// Set the chunk size for parallel iterators
89 pub fn with_chunk_size(mut self, chunksize: usize) -> Self {
90 self.chunk_size = Some(chunksize);
91 self
92 }
93
94 /// Thread pool initialization is now handled globally by scirs2-core
95 /// This method is kept for compatibility but no longer creates a new pool
96 pub fn init_thread_pool(&self) -> InterpolateResult<()> {
97 // Thread pool configuration is now handled globally by scirs2-core
98 // The n_workers parameter is preserved for future use but currently ignored
99 Ok(())
100 }
101
102 /// Get the chunk size to use for a given total size
103 pub fn get_chunk_size(&self, totalsize: usize) -> usize {
104 match self.chunk_size {
105 Some(_size) => _size,
106 None => {
107 // Choose a reasonable chunk _size based on total _size
108 // This is a heuristic and might need tuning for different workloads
109 let n_cpus = num_cpus::get();
110 let min_chunks_per_cpu = 4; // Ensure at least 4 chunks per CPU for load balancing
111
112 std::cmp::max(1, totalsize / (n_cpus * min_chunks_per_cpu))
113 }
114 }
115 }
116}
117
118/// Trait for functions that can be parallelized
119pub trait Parallelizable<T, R> {
120 /// Execute the function in parallel on a batch of inputs
121 fn execute_parallel(&self, inputs: &[T], config: &ParallelConfig) -> Vec<R>;
122}
123
124/// Implementation for any function that can be applied to a single input
125impl<T, R, F> Parallelizable<T, R> for F
126where
127 F: Fn(&T) -> R + Sync,
128 T: Sync,
129 R: Send,
130{
131 fn execute_parallel(&self, inputs: &[T], config: &ParallelConfig) -> Vec<R> {
132 let chunk_size = config.get_chunk_size(inputs.len());
133
134 inputs
135 .par_iter()
136 .with_min_len(chunk_size)
137 .map(self)
138 .collect()
139 }
140}
141
142/// Trait for types that support parallel evaluation
143pub trait ParallelEvaluate<F: Float, O> {
144 /// Evaluate at multiple points in parallel
145 fn evaluate_parallel(
146 &self,
147 points: &ArrayView2<F>,
148 config: &ParallelConfig,
149 ) -> InterpolateResult<O>;
150}
151
152/// Trait for interpolators that can make batch predictions in parallel
153pub trait ParallelPredict<F: Float> {
154 /// Make predictions at multiple points in parallel
155 fn predict_parallel(
156 &self,
157 points: &ArrayView2<F>,
158 config: &ParallelConfig,
159 ) -> InterpolateResult<Array1<F>>;
160}
161
162/// Helper function to estimate optimal chunk size
163///
164/// This function determines a reasonable chunk size for parallel processing
165/// based on the total size and the computational cost of each item.
166///
167/// # Arguments
168///
169/// * `total_size` - Total number of items to process
170/// * `costfactor` - Relative computational cost per item (higher = more expensive)
171/// * `config` - Parallel configuration
172///
173/// # Returns
174///
175/// The recommended chunk size
176#[allow(dead_code)]
177pub fn estimate_chunk_size(total_size: usize, costfactor: f64, config: &ParallelConfig) -> usize {
178 // If chunk _size is explicitly specified, use that
179 if let Some(size) = config.chunk_size {
180 return size;
181 }
182
183 // Otherwise, compute a reasonable chunk _size
184 let n_cpus = match config.n_workers {
185 Some(n) => n,
186 None => num_cpus::get(),
187 };
188
189 // Base estimate on desired chunks per CPU
190 let desired_chunks_per_cpu = if costfactor > 10.0 {
191 // Expensive operations - fewer, larger chunks
192 2
193 } else if costfactor > 1.0 {
194 // Moderately expensive - balanced
195 4
196 } else {
197 // Cheap operations - more, smaller chunks for better load balancing
198 8
199 };
200
201 let base_chunk_size = std::cmp::max(1, total_size / (n_cpus * desired_chunks_per_cpu));
202
203 // Apply cost _factor adjustment
204 let adjusted_size = (base_chunk_size as f64 * costfactor.sqrt()).ceil() as usize;
205
206 // Ensure a reasonable bound
207 std::cmp::min(total_size, std::cmp::max(1, adjusted_size))
208}
209
210/// Create evenly distributed indices for parallel tasks
211///
212/// This function partitions a range of indices into approximately
213/// equal-sized chunks for distribution across worker threads.
214///
215/// # Arguments
216///
217/// * `total_size` - Total number of items
218/// * `n_parts` - Number of partitions to create
219///
220/// # Returns
221///
222/// Vector of (start, end) index pairs for each partition
223#[allow(dead_code)]
224pub fn create_index_ranges(total_size: usize, nparts: usize) -> Vec<(usize, usize)> {
225 if total_size == 0 || nparts == 0 {
226 return Vec::new();
227 }
228
229 let n_parts = std::cmp::min(nparts, total_size);
230 let mut ranges = Vec::with_capacity(n_parts);
231
232 let chunk_size = total_size / n_parts;
233 let remainder = total_size % n_parts;
234
235 let mut start = 0;
236
237 for i in 0..n_parts {
238 let extra = if i < remainder { 1 } else { 0 };
239 let end = start + chunk_size + extra;
240
241 ranges.push((start, end));
242 start = end;
243 }
244
245 ranges
246}
247
248pub mod loess;
249pub mod mls;
250
251pub use loess::{
252 make_parallel_loess, make_parallel_robust_loess, ParallelLocalPolynomialRegression,
253};
254pub use mls::{make_parallel_mls, ParallelMovingLeastSquares};
255
256#[cfg(test)]
257mod tests {
258 use super::*;
259
260 #[test]
261 fn test_parallel_execution() {
262 let numbers = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
263 let config = ParallelConfig::new();
264
265 // Define a function to square a number
266 let square = |x: &i32| x * x;
267
268 // Execute in parallel
269 let result = square.execute_parallel(&numbers, &config);
270
271 // Check results
272 assert_eq!(result, vec![1, 4, 9, 16, 25, 36, 49, 64, 81, 100]);
273 }
274
275 #[test]
276 fn test_index_ranges() {
277 // Test with evenly divisible size
278 let ranges = create_index_ranges(10, 5);
279 assert_eq!(ranges, vec![(0, 2), (2, 4), (4, 6), (6, 8), (8, 10)]);
280
281 // Test with remainder
282 let ranges = create_index_ranges(11, 3);
283 assert_eq!(ranges, vec![(0, 4), (4, 8), (8, 11)]);
284
285 // Test with more parts than items
286 let ranges = create_index_ranges(3, 5);
287 assert_eq!(ranges, vec![(0, 1), (1, 2), (2, 3)]);
288 }
289
290 #[test]
291 fn test_chunk_size_estimation() {
292 let config = ParallelConfig::new();
293
294 // Test with different cost factors
295 let size_cheap = estimate_chunk_size(1000, 0.5, &config);
296 let size_moderate = estimate_chunk_size(1000, 5.0, &config);
297 let size_expensive = estimate_chunk_size(1000, 20.0, &config);
298
299 // Expensive operations should have larger chunks
300 assert!(size_expensive >= size_moderate);
301 assert!(size_moderate >= size_cheap);
302
303 // Test with explicit chunk size
304 let config_explicit = ParallelConfig::new().with_chunk_size(42);
305 let size = estimate_chunk_size(1000, 1.0, &config_explicit);
306 assert_eq!(size, 42);
307 }
308}