1use crate::error::{StatsError, StatsResult};
8use crate::error_standardization::ErrorMessages;
9use crate::simd_enhanced_core::{mean_enhanced, variance_enhanced, ComprehensiveStats};
10use crossbeam;
11use scirs2_core::ndarray::{Array1, Array2, ArrayBase, ArrayView1, Data, Ix1, Ix2};
12use scirs2_core::numeric::{Float, NumCast, One, Zero};
13use scirs2_core::{
14 parallel_ops::*,
15 simd_ops::{PlatformCapabilities, SimdUnifiedOps},
16};
17use std::collections::VecDeque;
18use std::sync::{atomic::AtomicUsize, Arc, Mutex};
19use std::thread;
20
21#[derive(Debug, Clone)]
23pub struct AdvancedParallelConfig {
24 pub parallel_threshold: usize,
26 pub num_threads: Option<usize>,
28 pub numa_aware: bool,
30 pub work_stealing: bool,
32 pub chunk_strategy: ChunkStrategy,
34 pub max_memory_usage: usize,
36}
37
38impl Default for AdvancedParallelConfig {
39 fn default() -> Self {
40 Self {
41 parallel_threshold: 10_000,
42 num_threads: None,
43 numa_aware: true,
44 work_stealing: true,
45 chunk_strategy: ChunkStrategy::Adaptive,
46 max_memory_usage: 1024 * 1024 * 1024, }
48 }
49}
50
51#[derive(Debug, Clone, Copy)]
53pub enum ChunkStrategy {
54 Fixed(usize),
56 CacheOptimal,
58 Adaptive,
60 WorkStealing,
62}
63
64pub struct AdvancedParallelProcessor<F: Float + std::fmt::Display> {
66 config: AdvancedParallelConfig,
67 capabilities: PlatformCapabilities,
68 #[allow(dead_code)]
69 thread_pool: Option<ThreadPool>,
70 #[allow(dead_code)]
71 work_queue: Arc<Mutex<VecDeque<ParallelTask<F>>>>,
72 #[allow(dead_code)]
73 active_workers: Arc<AtomicUsize>,
74}
75
76enum ParallelTask<F: Float + std::fmt::Display> {
78 Mean(Vec<F>),
79 Variance(Vec<F>, F, usize), Correlation(Vec<F>, Vec<F>),
81 Histogram(Vec<F>, usize),
82}
83
84pub enum ParallelResult<F: Float + std::fmt::Display> {
86 Mean(F),
87 Variance(F),
88 Correlation(F),
89 Histogram(Vec<usize>),
90}
91
92impl<F> AdvancedParallelProcessor<F>
93where
94 F: Float
95 + NumCast
96 + Send
97 + Sync
98 + SimdUnifiedOps
99 + Copy
100 + 'static
101 + Zero
102 + One
103 + std::fmt::Debug
104 + std::fmt::Display
105 + std::iter::Sum<F>,
106{
107 pub fn new(config: AdvancedParallelConfig) -> Self {
109 let capabilities = PlatformCapabilities::detect();
110
111 Self {
112 config,
113 capabilities,
114 thread_pool: None,
115 work_queue: Arc::new(Mutex::new(VecDeque::new())),
116 active_workers: Arc::new(AtomicUsize::new(0)),
117 }
118 }
119
120 pub fn initialize(&mut self) -> StatsResult<()> {
122 let num_threads = self
123 .config
124 .num_threads
125 .unwrap_or_else(|| self.optimal_thread_count());
126
127 self.thread_pool = Some(ThreadPool::new(num_threads, self.config.clone())?);
128 Ok(())
129 }
130
131 pub fn mean_parallel_advanced<D>(&self, x: &ArrayBase<D, Ix1>) -> StatsResult<F>
133 where
134 D: Data<Elem = F> + Sync + Send,
135 {
136 if x.is_empty() {
137 return Err(ErrorMessages::empty_array("x"));
138 }
139
140 let n = x.len();
141
142 if n < self.config.parallel_threshold {
144 return mean_enhanced(x);
145 }
146
147 match self.config.chunk_strategy {
149 ChunkStrategy::WorkStealing => self.mean_work_stealing(x),
150 ChunkStrategy::Adaptive => self.mean_adaptive_chunking(x),
151 ChunkStrategy::CacheOptimal => self.mean_cache_optimal(x),
152 ChunkStrategy::Fixed(chunksize) => self.mean_fixed_chunks(x, chunksize),
153 }
154 }
155
156 pub fn variance_parallel_advanced<D>(
158 &self,
159 x: &ArrayBase<D, Ix1>,
160 ddof: usize,
161 ) -> StatsResult<F>
162 where
163 D: Data<Elem = F> + Sync + Send,
164 {
165 let n = x.len();
166 if n == 0 {
167 return Err(ErrorMessages::empty_array("x"));
168 }
169 if n <= ddof {
170 return Err(ErrorMessages::insufficientdata(
171 "variance calculation",
172 ddof + 1,
173 n,
174 ));
175 }
176
177 if n < self.config.parallel_threshold {
178 return variance_enhanced(x, ddof);
179 }
180
181 self.variance_welford_parallel(x, ddof)
183 }
184
185 pub fn correlation_matrix_parallel<D>(&self, data: &ArrayBase<D, Ix2>) -> StatsResult<Array2<F>>
187 where
188 D: Data<Elem = F> + Sync + Send,
189 {
190 let (n_samples_, n_features) = data.dim();
191
192 if n_samples_ == 0 {
193 return Err(ErrorMessages::empty_array("data"));
194 }
195 if n_features == 0 {
196 return Err(ErrorMessages::insufficientdata(
197 "correlation matrix",
198 2,
199 n_features,
200 ));
201 }
202
203 let mut correlation_matrix = Array2::eye(n_features);
204
205 if n_features > 4 && n_samples_ > self.config.parallel_threshold {
207 self.correlation_matrix_parallel_upper_triangle(data, &mut correlation_matrix)?;
208 } else {
209 self.correlation_matrix_sequential(data, &mut correlation_matrix)?;
210 }
211
212 for i in 0..n_features {
214 for j in 0..i {
215 correlation_matrix[[i, j]] = correlation_matrix[[j, i]];
216 }
217 }
218
219 Ok(correlation_matrix)
220 }
221
222 pub fn batch_statistics_parallel<D>(
224 &self,
225 x: &ArrayBase<D, Ix1>,
226 ddof: usize,
227 ) -> StatsResult<ComprehensiveStats<F>>
228 where
229 D: Data<Elem = F> + Sync + Send,
230 {
231 let n = x.len();
232 if n == 0 {
233 return Err(ErrorMessages::empty_array("x"));
234 }
235 if n <= ddof {
236 return Err(ErrorMessages::insufficientdata(
237 "comprehensive statistics",
238 ddof + 1,
239 n,
240 ));
241 }
242
243 if n < self.config.parallel_threshold {
244 return crate::simd_enhanced_core::comprehensive_stats_simd(x, ddof);
246 }
247
248 self.comprehensive_stats_single_pass_parallel(x, ddof)
250 }
251
252 pub fn bootstrap_parallel<D>(
254 &self,
255 x: &ArrayBase<D, Ix1>,
256 n_samples_: usize,
257 statistic_fn: impl Fn(&ArrayView1<F>) -> F + Send + Sync + Clone,
258 seed: Option<u64>,
259 ) -> StatsResult<Array1<F>>
260 where
261 D: Data<Elem = F> + Sync + Send,
262 {
263 if x.is_empty() {
264 return Err(ErrorMessages::empty_array("x"));
265 }
266 if n_samples_ == 0 {
267 return Err(ErrorMessages::insufficientdata("bootstrap", 1, 0));
268 }
269
270 let num_threads = self
271 .config
272 .num_threads
273 .unwrap_or_else(|| self.optimal_thread_count());
274 let samples_per_thread = n_samples_.div_ceil(num_threads);
275
276 self.bootstrap_work_stealing(x, n_samples_, samples_per_thread, statistic_fn, seed)
278 }
279
280 fn optimal_thread_count(&self) -> usize {
283 let logical_cores = std::thread::available_parallelism()
284 .map(|n| n.get())
285 .unwrap_or(4);
286
287 if logical_cores > 2 {
294 logical_cores / 2
295 } else {
296 logical_cores
297 }
298 }
299
300 fn mean_work_stealing<D>(&self, x: &ArrayBase<D, Ix1>) -> StatsResult<F>
301 where
302 D: Data<Elem = F> + Sync + Send,
303 {
304 let n = x.len();
305 let num_threads = self
306 .config
307 .num_threads
308 .unwrap_or_else(|| self.optimal_thread_count());
309 let initial_chunksize = n.div_ceil(num_threads);
310
311 let work_queue: Arc<Mutex<VecDeque<(usize, usize)>>> =
313 Arc::new(Mutex::new(VecDeque::new()));
314
315 for i in 0..num_threads {
316 let start = i * initial_chunksize;
317 let end = ((i + 1) * initial_chunksize).min(n);
318 if start < end {
319 work_queue.lock().unwrap().push_back((start, end));
320 }
321 }
322
323 let partial_sums: Arc<Mutex<Vec<F>>> = Arc::new(Mutex::new(Vec::new()));
324 let data_slice = x
325 .as_slice()
326 .ok_or(StatsError::InvalidInput("Data not contiguous".to_string()))?;
327
328 crossbeam::scope(|s| {
329 for _ in 0..num_threads {
330 let work_queue = Arc::clone(&work_queue);
331 let partial_sums = Arc::clone(&partial_sums);
332
333 s.spawn(move |_| {
334 let mut local_sum = F::zero();
335
336 while let Some((start, end)) = work_queue.lock().unwrap().pop_front() {
337 for &val in &data_slice[start..end] {
339 local_sum = local_sum + val;
340 }
341
342 if end - start > 1000 {
344 let mid = (start + end) / 2;
345 if mid > start {
346 work_queue.lock().unwrap().push_back((mid, end));
347 }
348 }
349 }
350
351 partial_sums.lock().unwrap().push(local_sum);
352 });
353 }
354 })
355 .unwrap();
356
357 let total_sum = partial_sums
358 .lock()
359 .unwrap()
360 .iter()
361 .fold(F::zero(), |acc, &val| acc + val);
362 Ok(total_sum / F::from(n).unwrap())
363 }
364
365 fn mean_adaptive_chunking<D>(&self, x: &ArrayBase<D, Ix1>) -> StatsResult<F>
366 where
367 D: Data<Elem = F> + Sync + Send,
368 {
369 let n = x.len();
370 let elementsize = std::mem::size_of::<F>();
371
372 let l1_cache = 32 * 1024; let l2_cache = 256 * 1024; let chunksize = if n * elementsize <= l1_cache {
377 n } else if n * elementsize <= l2_cache {
379 l1_cache / elementsize } else {
381 l2_cache / elementsize };
383
384 let num_chunks = n.div_ceil(chunksize);
385 let _num_threads = self
386 .config
387 .num_threads
388 .unwrap_or_else(|| self.optimal_thread_count());
389
390 let chunks: Vec<_> = (0..num_chunks)
392 .map(|i| {
393 let start = i * chunksize;
394 let end = ((i + 1) * chunksize).min(n);
395 x.slice(scirs2_core::ndarray::s![start..end])
396 })
397 .collect();
398
399 let partial_sums: Vec<F> = chunks
400 .into_par_iter()
401 .map(|chunk| {
402 if self.capabilities.simd_available && chunk.len() > 64 {
403 F::simd_sum(&chunk)
404 } else {
405 chunk.iter().fold(F::zero(), |acc, &val| acc + val)
406 }
407 })
408 .collect();
409
410 let total_sum = partial_sums
411 .into_iter()
412 .fold(F::zero(), |acc, val| acc + val);
413 Ok(total_sum / F::from(n).unwrap())
414 }
415
416 fn mean_cache_optimal<D>(&self, x: &ArrayBase<D, Ix1>) -> StatsResult<F>
417 where
418 D: Data<Elem = F> + Sync + Send,
419 {
420 Self::mean_cache_oblivious_static(x, 0, x.len())
422 }
423
424 #[allow(dead_code)]
425 fn mean_cache_oblivious<D>(
426 &self,
427 x: &ArrayBase<D, Ix1>,
428 start: usize,
429 len: usize,
430 ) -> StatsResult<F>
431 where
432 D: Data<Elem = F> + Sync + Send,
433 {
434 Self::mean_cache_oblivious_static(x, start, len)
435 }
436
437 fn mean_cache_oblivious_static<D>(
439 x: &ArrayBase<D, Ix1>,
440 start: usize,
441 len: usize,
442 ) -> StatsResult<F>
443 where
444 D: Data<Elem = F> + Sync + Send,
445 F: Float + Send + Sync + 'static + std::fmt::Display,
446 {
447 const CACHE_THRESHOLD: usize = 1024; if len <= CACHE_THRESHOLD {
450 let slice = x.slice(scirs2_core::ndarray::s![start..start + len]);
452 let sum = slice.iter().fold(F::zero(), |acc, &val| acc + val);
453 Ok(sum / F::from(len).unwrap())
454 } else {
455 let mid = len / 2;
457 let left_result = Self::mean_cache_oblivious_static(x, start, mid)?;
458 let right_result = Self::mean_cache_oblivious_static(x, start + mid, len - mid)?;
459
460 let left_weight = F::from(mid).unwrap();
462 let right_weight = F::from(len - mid).unwrap();
463 let total_weight = F::from(len).unwrap();
464
465 Ok((left_result * left_weight + right_result * right_weight) / total_weight)
466 }
467 }
468
469 fn mean_fixed_chunks<D>(&self, x: &ArrayBase<D, Ix1>, chunksize: usize) -> StatsResult<F>
470 where
471 D: Data<Elem = F> + Sync + Send,
472 {
473 let n = x.len();
474 let chunks: Vec<_> = x
475 .exact_chunks(chunksize)
476 .into_iter()
477 .chain(if !n.is_multiple_of(chunksize) {
478 vec![x.slice(scirs2_core::ndarray::s![n - (n % chunksize)..])]
479 } else {
480 vec![]
481 })
482 .collect();
483
484 let partial_sums: Vec<F> = chunks
485 .into_par_iter()
486 .map(|chunk| chunk.iter().fold(F::zero(), |acc, &val| acc + val))
487 .collect();
488
489 let total_sum = partial_sums
490 .into_iter()
491 .fold(F::zero(), |acc, val| acc + val);
492 Ok(total_sum / F::from(n).unwrap())
493 }
494
495 fn variance_welford_parallel<D>(&self, x: &ArrayBase<D, Ix1>, ddof: usize) -> StatsResult<F>
496 where
497 D: Data<Elem = F> + Sync + Send,
498 {
499 let n = x.len();
501 let num_threads = self
502 .config
503 .num_threads
504 .unwrap_or_else(|| self.optimal_thread_count());
505 let chunksize = n.div_ceil(num_threads);
506
507 let results: Vec<(F, F, usize)> = (0..num_threads)
508 .into_par_iter()
509 .map(|i| {
510 let start = i * chunksize;
511 let end = ((i + 1) * chunksize).min(n);
512
513 if start >= end {
514 return (F::zero(), F::zero(), 0);
515 }
516
517 let chunk = x.slice(scirs2_core::ndarray::s![start..end]);
518 let mut mean = F::zero();
519 let mut m2 = F::zero();
520 let count = chunk.len();
521
522 for (j, &val) in chunk.iter().enumerate() {
523 let n = F::from(j + 1).unwrap();
524 let delta = val - mean;
525 mean = mean + delta / n;
526 let delta2 = val - mean;
527 m2 = m2 + delta * delta2;
528 }
529
530 (mean, m2, count)
531 })
532 .collect();
533
534 let (_final_mean, final_m2, final_count) = results.into_iter().fold(
536 (F::zero(), F::zero(), 0),
537 |(mean_a, m2_a, count_a), (mean_b, m2_b, count_b)| {
538 if count_b == 0 {
539 return (mean_a, m2_a, count_a);
540 }
541 if count_a == 0 {
542 return (mean_b, m2_b, count_b);
543 }
544
545 let total_count = count_a + count_b;
546 let count_a_f = F::from(count_a).unwrap();
547 let count_b_f = F::from(count_b).unwrap();
548 let total_count_f = F::from(total_count).unwrap();
549
550 let delta = mean_b - mean_a;
551 let combined_mean = (mean_a * count_a_f + mean_b * count_b_f) / total_count_f;
552 let combined_m2 =
553 m2_a + m2_b + delta * delta * count_a_f * count_b_f / total_count_f;
554
555 (combined_mean, combined_m2, total_count)
556 },
557 );
558
559 Ok(final_m2 / F::from(n - ddof).unwrap())
560 }
561
562 fn correlation_matrix_parallel_upper_triangle<D>(
563 &self,
564 data: &ArrayBase<D, Ix2>,
565 correlation_matrix: &mut Array2<F>,
566 ) -> StatsResult<()>
567 where
568 D: Data<Elem = F> + Sync + Send,
569 {
570 let (_, n_features) = data.dim();
571
572 let pairs: Vec<(usize, usize)> = (0..n_features)
574 .flat_map(|i| (i + 1..n_features).map(move |j| (i, j)))
575 .collect();
576
577 let results: Vec<((usize, usize), F)> = pairs
578 .into_par_iter()
579 .map(|(i, j)| {
580 let x = data.column(i);
581 let y = data.column(j);
582 let corr = crate::simd_enhanced_core::correlation_simd_enhanced(&x, &y)
583 .unwrap_or(F::zero());
584 ((i, j), corr)
585 })
586 .collect();
587
588 for ((i, j), corr) in results {
590 correlation_matrix[[i, j]] = corr;
591 }
592
593 Ok(())
594 }
595
596 fn correlation_matrix_sequential<D>(
597 &self,
598 data: &ArrayBase<D, Ix2>,
599 correlation_matrix: &mut Array2<F>,
600 ) -> StatsResult<()>
601 where
602 D: Data<Elem = F> + Sync + Send,
603 {
604 let (_, n_features) = data.dim();
605
606 for i in 0..n_features {
607 for j in i + 1..n_features {
608 let x = data.column(i);
609 let y = data.column(j);
610 let corr = crate::simd_enhanced_core::correlation_simd_enhanced(&x, &y)?;
611 correlation_matrix[[i, j]] = corr;
612 }
613 }
614
615 Ok(())
616 }
617
618 fn comprehensive_stats_single_pass_parallel<D>(
619 &self,
620 x: &ArrayBase<D, Ix1>,
621 ddof: usize,
622 ) -> StatsResult<ComprehensiveStats<F>>
623 where
624 D: Data<Elem = F> + Sync + Send,
625 {
626 let n = x.len();
627 let num_threads = self
628 .config
629 .num_threads
630 .unwrap_or_else(|| self.optimal_thread_count());
631 let chunksize = n.div_ceil(num_threads);
632
633 let results: Vec<(F, F, F, F, usize)> = (0..num_threads)
635 .into_par_iter()
636 .map(|i| {
637 let start = i * chunksize;
638 let end = ((i + 1) * chunksize).min(n);
639
640 if start >= end {
641 return (F::zero(), F::zero(), F::zero(), F::zero(), 0);
642 }
643
644 let chunk = x.slice(scirs2_core::ndarray::s![start..end]);
645 let count = chunk.len();
646 let count_f = F::from(count).unwrap();
647
648 let mean = chunk.iter().fold(F::zero(), |acc, &val| acc + val) / count_f;
650
651 let (m2, m3, m4) =
652 chunk
653 .iter()
654 .fold((F::zero(), F::zero(), F::zero()), |(m2, m3, m4), &val| {
655 let dev = val - mean;
656 let dev2 = dev * dev;
657 let dev3 = dev2 * dev;
658 let dev4 = dev2 * dev2;
659 (m2 + dev2, m3 + dev3, m4 + dev4)
660 });
661
662 (mean, m2, m3, m4, count)
663 })
664 .collect();
665
666 let (total_mean, total_m2_, total_m3, total_m4, total_count) = results.into_iter().fold(
668 (F::zero(), F::zero(), F::zero(), F::zero(), 0),
669 |(mean_acc, m2_acc, m3_acc, m4_acc, count_acc), (mean, m2, m3, m4, count)| {
670 if count == 0 {
671 return (mean_acc, m2_acc, m3_acc, m4_acc, count_acc);
672 }
673 if count_acc == 0 {
674 return (mean, m2, m3, m4, count);
675 }
676
677 let total_count = count_acc + count;
679 let count_f = F::from(count).unwrap();
680 let count_acc_f = F::from(count_acc).unwrap();
681 let total_count_f = F::from(total_count).unwrap();
682
683 let combined_mean = (mean_acc * count_acc_f + mean * count_f) / total_count_f;
684
685 (
687 combined_mean,
688 m2_acc + m2,
689 m3_acc + m3,
690 m4_acc + m4,
691 total_count,
692 )
693 },
694 );
695
696 let variance = total_m2_ / F::from(n - ddof).unwrap();
697 let std = variance.sqrt();
698
699 let skewness = if variance > F::epsilon() {
700 (total_m3 / F::from(n).unwrap()) / variance.powf(F::from(1.5).unwrap())
701 } else {
702 F::zero()
703 };
704
705 let kurtosis = if variance > F::epsilon() {
706 (total_m4 / F::from(n).unwrap()) / (variance * variance) - F::from(3.0).unwrap()
707 } else {
708 F::zero()
709 };
710
711 Ok(ComprehensiveStats {
712 mean: total_mean,
713 variance,
714 std,
715 skewness,
716 kurtosis,
717 count: n,
718 })
719 }
720
721 fn bootstrap_work_stealing<D>(
722 &self,
723 x: &ArrayBase<D, Ix1>,
724 n_samples_: usize,
725 samples_per_thread: usize,
726 statistic_fn: impl Fn(&ArrayView1<F>) -> F + Send + Sync + Clone,
727 seed: Option<u64>,
728 ) -> StatsResult<Array1<F>>
729 where
730 D: Data<Elem = F> + Sync + Send,
731 {
732 use scirs2_core::random::ChaCha8Rng;
733 use scirs2_core::random::{Rng, SeedableRng};
734
735 let num_threads = self
736 .config
737 .num_threads
738 .unwrap_or_else(|| self.optimal_thread_count());
739 let _results: Vec<F> = Vec::with_capacity(n_samples_);
740
741 let data_vec: Vec<F> = x.iter().cloned().collect();
742 let data_arc = Arc::new(data_vec);
743
744 let partial_results: Arc<Mutex<Vec<F>>> = Arc::new(Mutex::new(Vec::new()));
745
746 crossbeam::scope(|s| {
747 for thread_id in 0..num_threads {
748 let data_arc = Arc::clone(&data_arc);
749 let partial_results = Arc::clone(&partial_results);
750 let statistic_fn = statistic_fn.clone();
751
752 s.spawn(move |_| {
753 let mut rng = if let Some(seed) = seed {
754 ChaCha8Rng::seed_from_u64(seed + thread_id as u64)
755 } else {
756 ChaCha8Rng::from_rng(&mut scirs2_core::random::thread_rng())
757 };
758
759 let mut local_results = Vec::with_capacity(samples_per_thread);
760 let ndata = data_arc.len();
761
762 for _ in 0..samples_per_thread {
763 let bootstrap_indices: Vec<usize> =
765 (0..ndata).map(|_| rng.gen_range(0..ndata)).collect();
766
767 let bootstrap_sample: Vec<F> =
768 bootstrap_indices.into_iter().map(|i| data_arc[i]).collect();
769
770 let sample_array = Array1::from(bootstrap_sample);
771 let statistic = statistic_fn(&sample_array.view());
772 local_results.push(statistic);
773 }
774
775 partial_results.lock().unwrap().extend(local_results);
776 });
777 }
778 })
779 .unwrap();
780
781 let mut all_results = partial_results.lock().unwrap();
782 all_results.truncate(n_samples_); Ok(Array1::from(all_results.clone()))
785 }
786}
787
788struct ThreadPool {
790 workers: Vec<thread::JoinHandle<()>>,
791 sender: std::sync::mpsc::Sender<Message>,
792}
793
794type Job = Box<dyn FnOnce() + Send + 'static>;
795
796enum Message {
797 NewJob(Job),
798 Terminate,
799}
800
801impl ThreadPool {
802 fn new(size: usize, config: AdvancedParallelConfig) -> StatsResult<ThreadPool> {
803 if size == 0 {
804 return Err(ErrorMessages::invalid_probability("thread count", 0.0));
805 }
806
807 let (sender, receiver) = std::sync::mpsc::channel();
808 let receiver = Arc::new(Mutex::new(receiver));
809 let mut workers = Vec::with_capacity(size);
810
811 for _id in 0..size {
812 let receiver = Arc::clone(&receiver);
813
814 let worker = thread::spawn(move || loop {
815 let message = receiver.lock().unwrap().recv().unwrap();
816
817 match message {
818 Message::NewJob(job) => {
819 job();
820 }
821 Message::Terminate => {
822 break;
823 }
824 }
825 });
826
827 workers.push(worker);
828 }
829
830 Ok(ThreadPool { workers, sender })
831 }
832
833 #[allow(dead_code)]
834 fn execute<F>(&self, f: F)
835 where
836 F: FnOnce() + Send + 'static,
837 {
838 let job = Box::new(f);
839 self.sender.send(Message::NewJob(job)).unwrap();
840 }
841}
842
843impl Drop for ThreadPool {
844 fn drop(&mut self) {
845 for _ in &self.workers {
846 self.sender.send(Message::Terminate).unwrap();
847 }
848
849 for worker in &mut self.workers {
850 if let Some(handle) = worker.thread().name() {
851 println!("Shutting down worker {}", handle);
852 }
853 }
854 }
855}
856
857#[allow(dead_code)]
859pub fn create_advanced_parallel_processor<F>() -> AdvancedParallelProcessor<F>
860where
861 F: Float
862 + NumCast
863 + Send
864 + Sync
865 + SimdUnifiedOps
866 + Copy
867 + 'static
868 + Zero
869 + One
870 + std::fmt::Debug
871 + std::fmt::Display
872 + std::iter::Sum<F>,
873{
874 AdvancedParallelProcessor::new(AdvancedParallelConfig::default())
875}
876
877#[allow(dead_code)]
879pub fn create_configured_parallel_processor<F>(
880 config: AdvancedParallelConfig,
881) -> AdvancedParallelProcessor<F>
882where
883 F: Float
884 + NumCast
885 + Send
886 + Sync
887 + SimdUnifiedOps
888 + Copy
889 + 'static
890 + Zero
891 + One
892 + std::fmt::Debug
893 + std::fmt::Display
894 + std::iter::Sum<F>,
895{
896 AdvancedParallelProcessor::new(config)
897}