1use parking_lot::RwLock;
11use scirs2_core::parallel_ops::*;
12use std::collections::HashMap;
13use std::hash::Hash;
14use std::path::Path;
15use std::sync::Arc;
16
17use candle_core::{Device, Result as CandleResult, Tensor};
19
20pub use gpu::{GpuAccelerator, GpuMemoryManager, SpectralFeatures as GpuSpectralFeatures};
22
23pub fn process_audio_chunks<T, F, R>(data: &[T], chunk_size: usize, processor: F) -> Vec<R>
25where
26 T: Send + Sync,
27 F: Fn(&[T]) -> R + Send + Sync,
28 R: Send,
29{
30 if data.len() <= chunk_size {
31 return vec![processor(data)];
32 }
33
34 data.par_chunks(chunk_size).map(processor).collect()
35}
36
37#[must_use]
39pub fn parallel_correlation(x: &[f32], y: &[f32]) -> f32 {
40 if x.len() != y.len() || x.is_empty() {
41 return 0.0;
42 }
43
44 if x.len() > 1000 {
46 let (sum_xy, sum_x, sum_y, sum_x2, sum_y2) = x
47 .par_iter()
48 .zip(y.par_iter())
49 .map(|(&xi, &yi)| (xi * yi, xi, yi, xi * xi, yi * yi))
50 .reduce(
51 || (0.0, 0.0, 0.0, 0.0, 0.0),
52 |acc, item| {
53 (
54 acc.0 + item.0,
55 acc.1 + item.1,
56 acc.2 + item.2,
57 acc.3 + item.3,
58 acc.4 + item.4,
59 )
60 },
61 );
62
63 let n = x.len() as f32;
64 let numerator = n * sum_xy - sum_x * sum_y;
65 let denominator = ((n * sum_x2 - sum_x * sum_x) * (n * sum_y2 - sum_y * sum_y)).sqrt();
66
67 if denominator > f32::EPSILON {
68 numerator / denominator
69 } else {
70 0.0
71 }
72 } else {
73 crate::calculate_correlation(x, y)
75 }
76}
77
78#[must_use]
80pub fn parallel_fft_batch(signals: &[Vec<f32>]) -> Vec<Vec<f32>> {
81 use scirs2_fft::{RealFftPlanner, RealToComplex};
82 use std::sync::Mutex;
83
84 let planner = Arc::new(Mutex::new(RealFftPlanner::<f32>::new()));
85
86 signals
87 .par_iter()
88 .map(|signal| {
89 if signal.is_empty() {
90 return Vec::new();
91 }
92
93 let mut planner_guard = planner.lock().unwrap();
94 let fft = planner_guard.plan_fft_forward(signal.len());
95 drop(planner_guard);
96
97 let mut indata = signal.clone();
98 let mut spectrum = vec![scirs2_core::Complex::new(0.0, 0.0); fft.output_len()];
99
100 fft.process(&indata, &mut spectrum);
101
102 spectrum.iter().map(|c| c.norm()).collect()
104 })
105 .collect()
106}
107
108#[must_use]
110pub fn parallel_autocorrelation(signal: &[f32], max_lag: usize) -> Vec<f32> {
111 if signal.len() < 2 || max_lag == 0 {
112 return vec![0.0; max_lag + 1];
113 }
114
115 let effective_max_lag = max_lag.min(signal.len() - 1);
116
117 (0..=effective_max_lag)
118 .into_par_iter()
119 .map(|lag| {
120 let mut correlation = 0.0;
121 let mut norm1 = 0.0;
122 let mut norm2 = 0.0;
123
124 for i in 0..(signal.len() - lag) {
125 correlation += signal[i] * signal[i + lag];
126 norm1 += signal[i] * signal[i];
127 if lag == 0 {
128 norm2 = norm1;
129 } else {
130 norm2 += signal[i + lag] * signal[i + lag];
131 }
132 }
133
134 if norm1 > 0.0 && norm2 > 0.0 {
135 correlation / (norm1 * norm2).sqrt()
136 } else {
137 0.0
138 }
139 })
140 .collect()
141}
142
143pub struct LRUCache<K, V> {
145 map: Arc<RwLock<HashMap<K, V>>>,
146 max_size: usize,
147}
148
149impl<K, V> LRUCache<K, V>
150where
151 K: Eq + Hash + Clone,
152 V: Clone,
153{
154 #[must_use]
156 pub fn new(max_size: usize) -> Self {
157 Self {
158 map: Arc::new(RwLock::new(HashMap::new())),
159 max_size,
160 }
161 }
162
163 pub fn get(&self, key: &K) -> Option<V> {
165 self.map.read().get(key).cloned()
166 }
167
168 pub fn insert(&self, key: K, value: V) {
170 let mut map = self.map.write();
171
172 if map.len() >= self.max_size {
174 map.clear();
175 }
176
177 map.insert(key, value);
178 }
179
180 pub fn clear(&self) {
182 self.map.write().clear();
183 }
184
185 #[must_use]
187 pub fn len(&self) -> usize {
188 self.map.read().len()
189 }
190
191 #[must_use]
193 pub fn is_empty(&self) -> bool {
194 self.map.read().is_empty()
195 }
196}
197
198pub struct PersistentCache<K, V> {
203 memory_cache: LRUCache<K, V>,
204 cache_dir: std::path::PathBuf,
205 compression_level: u32,
206}
207
208impl<K, V> PersistentCache<K, V>
209where
210 K: Eq + Hash + Clone + serde::Serialize + serde::de::DeserializeOwned,
211 V: Clone + serde::Serialize + serde::de::DeserializeOwned,
212{
213 pub fn new<P: AsRef<Path>>(
215 cache_dir: P,
216 max_memory_size: usize,
217 compression_level: u32,
218 ) -> Result<Self, std::io::Error> {
219 let cache_dir = cache_dir.as_ref().to_path_buf();
220
221 if !cache_dir.exists() {
223 std::fs::create_dir_all(&cache_dir)?;
224 }
225
226 Ok(Self {
227 memory_cache: LRUCache::new(max_memory_size),
228 cache_dir,
229 compression_level: compression_level.clamp(0, 9),
230 })
231 }
232
233 pub fn get(&self, key: &K) -> Option<V> {
235 if let Some(value) = self.memory_cache.get(key) {
237 return Some(value);
238 }
239
240 if let Ok(value) = self.load_from_disk(key) {
242 self.memory_cache.insert(key.clone(), value.clone());
244 return Some(value);
245 }
246
247 None
248 }
249
250 pub fn insert(&self, key: K, value: V) -> Result<(), std::io::Error> {
252 self.memory_cache.insert(key.clone(), value.clone());
254
255 self.save_to_disk(&key, &value)
257 }
258
259 pub fn clear(&self) -> Result<(), std::io::Error> {
261 self.memory_cache.clear();
262
263 for entry in std::fs::read_dir(&self.cache_dir)? {
265 let entry = entry?;
266 if entry.path().is_file() {
267 std::fs::remove_file(entry.path())?;
268 }
269 }
270
271 Ok(())
272 }
273
274 #[must_use]
276 pub fn memory_len(&self) -> usize {
277 self.memory_cache.len()
278 }
279
280 pub fn total_len(&self) -> usize {
282 std::fs::read_dir(&self.cache_dir)
283 .map(|entries| entries.filter_map(|e| e.ok()).count())
284 .unwrap_or(0)
285 }
286
287 #[must_use]
289 pub fn is_empty(&self) -> bool {
290 self.memory_cache.is_empty() && self.total_len() == 0
291 }
292
293 pub fn stats(&self) -> CacheStats {
295 CacheStats {
296 memory_entries: self.memory_cache.len(),
297 disk_entries: self.total_len(),
298 cache_dir_size: self.calculate_cache_dir_size(),
299 }
300 }
301
302 pub fn set_compression_level(&mut self, level: u32) {
304 self.compression_level = level.clamp(0, 9);
305 }
306
307 fn cache_key_to_filename(&self, key: &K) -> Result<String, std::io::Error> {
310 let serialized = bincode::serde::encode_to_vec(key, bincode::config::standard())
311 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
312 let hash = {
313 use std::collections::hash_map::DefaultHasher;
314 use std::hash::Hasher;
315 let mut hasher = DefaultHasher::new();
316 hasher.write(&serialized);
317 hasher.finish()
318 };
319 Ok(format!("{:x}.cache", hash))
320 }
321
322 fn save_to_disk(&self, key: &K, value: &V) -> Result<(), std::io::Error> {
323 let filename = self.cache_key_to_filename(key)?;
324 let filepath = self.cache_dir.join(filename);
325
326 let serialized = bincode::serde::encode_to_vec(value, bincode::config::standard())
327 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
328
329 let mut encoder = flate2::write::GzEncoder::new(
331 Vec::new(),
332 flate2::Compression::new(self.compression_level),
333 );
334 std::io::Write::write_all(&mut encoder, &serialized)?;
335 let compressed = encoder.finish()?;
336
337 std::fs::write(filepath, compressed)
338 }
339
340 fn load_from_disk(&self, key: &K) -> Result<V, std::io::Error> {
341 let filename = self.cache_key_to_filename(key)?;
342 let filepath = self.cache_dir.join(filename);
343
344 if !filepath.exists() {
345 return Err(std::io::Error::new(
346 std::io::ErrorKind::NotFound,
347 "Cache entry not found",
348 ));
349 }
350
351 let compressed_data = std::fs::read(filepath)?;
352
353 let mut decoder = flate2::read::GzDecoder::new(&compressed_data[..]);
355 let mut decompressed = Vec::new();
356 std::io::Read::read_to_end(&mut decoder, &mut decompressed)?;
357
358 bincode::serde::decode_from_slice(&decompressed, bincode::config::standard())
359 .map(|(v, _)| v)
360 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))
361 }
362
363 fn calculate_cache_dir_size(&self) -> u64 {
364 std::fs::read_dir(&self.cache_dir)
365 .map(|entries| {
366 entries
367 .filter_map(|entry| entry.ok().and_then(|e| e.metadata().ok()).map(|m| m.len()))
368 .sum()
369 })
370 .unwrap_or(0)
371 }
372}
373
374#[derive(Debug, Clone)]
376pub struct CacheStats {
377 pub memory_entries: usize,
379 pub disk_entries: usize,
381 pub cache_dir_size: u64,
383}
384
385pub struct SlidingWindowProcessor<T> {
387 window_size: usize,
388 hop_size: usize,
389 buffer: Vec<T>,
390}
391
392impl<T> SlidingWindowProcessor<T>
393where
394 T: Clone + Default,
395{
396 #[must_use]
398 pub fn new(window_size: usize, hop_size: usize) -> Self {
399 Self {
400 window_size,
401 hop_size,
402 buffer: Vec::with_capacity(window_size),
403 }
404 }
405
406 pub fn process_parallel<F, R>(&self, data: &[T], processor: F) -> Vec<R>
408 where
409 T: Send + Sync,
410 F: Fn(&[T]) -> R + Send + Sync,
411 R: Send,
412 {
413 if data.len() < self.window_size {
414 return Vec::new();
415 }
416
417 let num_windows = (data.len() - self.window_size) / self.hop_size + 1;
418
419 (0..num_windows)
420 .into_par_iter()
421 .map(|i| {
422 let start = i * self.hop_size;
423 let end = (start + self.window_size).min(data.len());
424 processor(&data[start..end])
425 })
426 .collect()
427 }
428}
429
430pub mod simd {
432 use scirs2_core::parallel_ops::*;
433
434 #[must_use]
436 pub fn dot_product(a: &[f32], b: &[f32]) -> f32 {
437 if a.len() != b.len() {
438 return 0.0;
439 }
440
441 if a.len() > 1000 {
443 a.par_iter().zip(b.par_iter()).map(|(&x, &y)| x * y).sum()
444 } else {
445 dot_product_simd(a, b)
446 }
447 }
448
449 #[must_use]
451 pub fn element_wise_multiply(a: &[f32], b: &[f32]) -> Vec<f32> {
452 if a.len() != b.len() {
453 return Vec::new();
454 }
455
456 if a.len() > 1000 {
457 a.par_iter()
458 .zip(b.par_iter())
459 .map(|(&x, &y)| x * y)
460 .collect()
461 } else {
462 element_wise_multiply_simd(a, b)
463 }
464 }
465
466 #[must_use]
468 pub fn rms(signal: &[f32]) -> f32 {
469 if signal.is_empty() {
470 return 0.0;
471 }
472
473 let sum_squares = if signal.len() > 1000 {
474 signal.par_iter().map(|&x| x * x).sum::<f32>()
475 } else {
476 rms_simd(signal)
477 };
478
479 (sum_squares / signal.len() as f32).sqrt()
480 }
481
482 #[must_use]
484 pub fn spectral_centroid(spectrum: &[f32], sample_rate: f32) -> f32 {
485 if spectrum.is_empty() {
486 return 0.0;
487 }
488
489 let (weighted_sum, magnitude_sum) = if spectrum.len() > 500 {
490 spectrum
491 .par_iter()
492 .enumerate()
493 .map(|(i, &magnitude)| {
494 let frequency = i as f32 * sample_rate / (2.0 * spectrum.len() as f32);
495 (frequency * magnitude, magnitude)
496 })
497 .reduce(|| (0.0, 0.0), |acc, item| (acc.0 + item.0, acc.1 + item.1))
498 } else {
499 spectral_centroid_simd(spectrum, sample_rate)
500 };
501
502 if magnitude_sum > 0.0 {
503 weighted_sum / magnitude_sum
504 } else {
505 0.0
506 }
507 }
508
509 #[must_use]
511 pub fn vector_add(a: &[f32], b: &[f32]) -> Vec<f32> {
512 if a.len() != b.len() {
513 return Vec::new();
514 }
515
516 if a.len() > 1000 {
517 a.par_iter()
518 .zip(b.par_iter())
519 .map(|(&x, &y)| x + y)
520 .collect()
521 } else {
522 vector_add_simd(a, b)
523 }
524 }
525
526 #[must_use]
528 pub fn vector_subtract(a: &[f32], b: &[f32]) -> Vec<f32> {
529 if a.len() != b.len() {
530 return Vec::new();
531 }
532
533 if a.len() > 1000 {
534 a.par_iter()
535 .zip(b.par_iter())
536 .map(|(&x, &y)| x - y)
537 .collect()
538 } else {
539 vector_subtract_simd(a, b)
540 }
541 }
542
543 #[cfg(target_arch = "x86_64")]
545 mod x86_impl {
546 #[cfg(target_feature = "avx")]
547 pub fn dot_product_avx(a: &[f32], b: &[f32]) -> f32 {
548 use std::arch::x86_64::*;
549
550 let len = a.len();
551 let chunks = len / 8;
552 let mut sum = unsafe { _mm256_setzero_ps() };
553
554 unsafe {
555 for i in 0..chunks {
556 let a_vec = _mm256_loadu_ps(a.as_ptr().add(i * 8));
557 let b_vec = _mm256_loadu_ps(b.as_ptr().add(i * 8));
558 let mul = _mm256_mul_ps(a_vec, b_vec);
559 sum = _mm256_add_ps(sum, mul);
560 }
561
562 let sum_low = _mm256_extractf128_ps(sum, 0);
564 let sum_high = _mm256_extractf128_ps(sum, 1);
565 let sum_128 = _mm_add_ps(sum_low, sum_high);
566
567 let sum_64 = _mm_add_ps(sum_128, _mm_movehl_ps(sum_128, sum_128));
568 let sum_32 = _mm_add_ss(sum_64, _mm_shuffle_ps(sum_64, sum_64, 1));
569
570 let mut result = _mm_cvtss_f32(sum_32);
571
572 for i in (chunks * 8)..len {
574 result += a[i] * b[i];
575 }
576
577 result
578 }
579 }
580
581 #[cfg(target_feature = "sse")]
582 pub fn dot_product_sse(a: &[f32], b: &[f32]) -> f32 {
583 use std::arch::x86_64::*;
584
585 let len = a.len();
586 let chunks = len / 4;
587 let mut sum = unsafe { _mm_setzero_ps() };
588
589 unsafe {
590 for i in 0..chunks {
591 let a_vec = _mm_loadu_ps(a.as_ptr().add(i * 4));
592 let b_vec = _mm_loadu_ps(b.as_ptr().add(i * 4));
593 let mul = _mm_mul_ps(a_vec, b_vec);
594 sum = _mm_add_ps(sum, mul);
595 }
596
597 let sum_64 = _mm_add_ps(sum, _mm_movehl_ps(sum, sum));
599 let sum_32 = _mm_add_ss(sum_64, _mm_shuffle_ps(sum_64, sum_64, 1));
600
601 let mut result = _mm_cvtss_f32(sum_32);
602
603 for i in (chunks * 4)..len {
605 result += a[i] * b[i];
606 }
607
608 result
609 }
610 }
611
612 #[cfg(target_feature = "avx")]
613 pub fn element_wise_multiply_avx(a: &[f32], b: &[f32]) -> Vec<f32> {
614 use std::arch::x86_64::*;
615
616 let len = a.len();
617 let chunks = len / 8;
618 let mut result: Vec<f32> = Vec::with_capacity(len);
619
620 unsafe {
621 for i in 0..chunks {
622 let a_vec = _mm256_loadu_ps(a.as_ptr().add(i * 8));
623 let b_vec = _mm256_loadu_ps(b.as_ptr().add(i * 8));
624 let mul = _mm256_mul_ps(a_vec, b_vec);
625
626 let ptr = result.as_mut_ptr().add(i * 8);
627 _mm256_storeu_ps(ptr, mul);
628 }
629
630 result.set_len(chunks * 8);
631
632 for i in (chunks * 8)..len {
634 result.push(a[i] * b[i]);
635 }
636 }
637
638 result
639 }
640 }
641
642 #[cfg(target_arch = "aarch64")]
644 mod arm_impl {
645 #[cfg(target_feature = "neon")]
646 pub fn dot_product_neon(a: &[f32], b: &[f32]) -> f32 {
647 use std::arch::aarch64::*;
648
649 let len = a.len();
650 let chunks = len / 4;
651 let mut sum = unsafe { vdupq_n_f32(0.0) };
652
653 unsafe {
654 for i in 0..chunks {
655 let a_vec = vld1q_f32(a.as_ptr().add(i * 4));
656 let b_vec = vld1q_f32(b.as_ptr().add(i * 4));
657 let mul = vmulq_f32(a_vec, b_vec);
658 sum = vaddq_f32(sum, mul);
659 }
660
661 let sum_pair = vadd_f32(vget_high_f32(sum), vget_low_f32(sum));
663 let result_vec = vpadd_f32(sum_pair, sum_pair);
664 let mut result = vget_lane_f32(result_vec, 0);
665
666 for i in (chunks * 4)..len {
668 result += a[i] * b[i];
669 }
670
671 result
672 }
673 }
674
675 #[cfg(target_feature = "neon")]
676 pub fn element_wise_multiply_neon(a: &[f32], b: &[f32]) -> Vec<f32> {
677 use std::arch::aarch64::*;
678
679 let len = a.len();
680 let chunks = len / 4;
681 let mut result: Vec<f32> = Vec::with_capacity(len);
682
683 unsafe {
684 for i in 0..chunks {
685 let a_vec = vld1q_f32(a.as_ptr().add(i * 4));
686 let b_vec = vld1q_f32(b.as_ptr().add(i * 4));
687 let mul = vmulq_f32(a_vec, b_vec);
688
689 vst1q_f32(result.as_mut_ptr().add(i * 4), mul);
690 }
691
692 result.set_len(chunks * 4);
693
694 for i in (chunks * 4)..len {
696 result.push(a[i] * b[i]);
697 }
698 }
699
700 result
701 }
702 }
703
704 #[inline]
706 fn dot_product_simd(a: &[f32], b: &[f32]) -> f32 {
707 #[cfg(all(target_arch = "x86_64", target_feature = "avx"))]
708 {
709 x86_impl::dot_product_avx(a, b)
710 }
711 #[cfg(all(
712 target_arch = "x86_64",
713 target_feature = "sse",
714 not(target_feature = "avx")
715 ))]
716 {
717 x86_impl::dot_product_sse(a, b)
718 }
719 #[cfg(all(target_arch = "aarch64", target_feature = "neon"))]
720 {
721 arm_impl::dot_product_neon(a, b)
722 }
723 #[cfg(not(any(
724 all(
725 target_arch = "x86_64",
726 any(target_feature = "avx", target_feature = "sse")
727 ),
728 all(target_arch = "aarch64", target_feature = "neon")
729 )))]
730 {
731 a.iter().zip(b.iter()).map(|(&x, &y)| x * y).sum()
733 }
734 }
735
736 #[inline]
737 fn element_wise_multiply_simd(a: &[f32], b: &[f32]) -> Vec<f32> {
738 #[cfg(all(target_arch = "x86_64", target_feature = "avx"))]
739 {
740 x86_impl::element_wise_multiply_avx(a, b)
741 }
742 #[cfg(all(target_arch = "aarch64", target_feature = "neon"))]
743 {
744 arm_impl::element_wise_multiply_neon(a, b)
745 }
746 #[cfg(not(any(
747 all(target_arch = "x86_64", target_feature = "avx"),
748 all(target_arch = "aarch64", target_feature = "neon")
749 )))]
750 {
751 a.iter().zip(b.iter()).map(|(&x, &y)| x * y).collect()
753 }
754 }
755
756 #[inline]
757 fn rms_simd(signal: &[f32]) -> f32 {
758 dot_product_simd(signal, signal)
760 }
761
762 #[inline]
763 fn spectral_centroid_simd(spectrum: &[f32], sample_rate: f32) -> (f32, f32) {
764 spectrum
765 .iter()
766 .enumerate()
767 .map(|(i, &magnitude)| {
768 let frequency = i as f32 * sample_rate / (2.0 * spectrum.len() as f32);
769 (frequency * magnitude, magnitude)
770 })
771 .fold((0.0, 0.0), |acc, item| (acc.0 + item.0, acc.1 + item.1))
772 }
773
774 #[inline]
775 fn vector_add_simd(a: &[f32], b: &[f32]) -> Vec<f32> {
776 #[cfg(all(target_arch = "x86_64", target_feature = "avx"))]
778 {
779 use std::arch::x86_64::*;
780
781 let len = a.len();
782 let chunks = len / 8;
783 let mut result: Vec<f32> = Vec::with_capacity(len);
784
785 unsafe {
786 for i in 0..chunks {
787 let a_vec = _mm256_loadu_ps(a.as_ptr().add(i * 8));
788 let b_vec = _mm256_loadu_ps(b.as_ptr().add(i * 8));
789 let add = _mm256_add_ps(a_vec, b_vec);
790
791 let ptr = result.as_mut_ptr().add(i * 8);
792 _mm256_storeu_ps(ptr, add);
793 }
794
795 result.set_len(chunks * 8);
796
797 for i in (chunks * 8)..len {
799 result.push(a[i] + b[i]);
800 }
801 }
802
803 result
804 }
805 #[cfg(not(all(target_arch = "x86_64", target_feature = "avx")))]
806 {
807 a.iter().zip(b.iter()).map(|(&x, &y)| x + y).collect()
808 }
809 }
810
811 #[inline]
812 fn vector_subtract_simd(a: &[f32], b: &[f32]) -> Vec<f32> {
813 #[cfg(all(target_arch = "x86_64", target_feature = "avx"))]
815 {
816 use std::arch::x86_64::*;
817
818 let len = a.len();
819 let chunks = len / 8;
820 let mut result: Vec<f32> = Vec::with_capacity(len);
821
822 unsafe {
823 for i in 0..chunks {
824 let a_vec = _mm256_loadu_ps(a.as_ptr().add(i * 8));
825 let b_vec = _mm256_loadu_ps(b.as_ptr().add(i * 8));
826 let sub = _mm256_sub_ps(a_vec, b_vec);
827
828 let ptr = result.as_mut_ptr().add(i * 8);
829 _mm256_storeu_ps(ptr, sub);
830 }
831
832 result.set_len(chunks * 8);
833
834 for i in (chunks * 8)..len {
836 result.push(a[i] - b[i]);
837 }
838 }
839
840 result
841 }
842 #[cfg(not(all(target_arch = "x86_64", target_feature = "avx")))]
843 {
844 a.iter().zip(b.iter()).map(|(&x, &y)| x - y).collect()
845 }
846 }
847}
848
849pub struct PerformanceMonitor {
851 timings: Arc<RwLock<HashMap<String, Vec<std::time::Duration>>>>,
852}
853
854impl PerformanceMonitor {
855 #[must_use]
857 pub fn new() -> Self {
858 Self {
859 timings: Arc::new(RwLock::new(HashMap::new())),
860 }
861 }
862
863 pub fn time_operation<F, R>(&self, name: &str, operation: F) -> R
865 where
866 F: FnOnce() -> R,
867 {
868 let start = std::time::Instant::now();
869 let result = operation();
870 let duration = start.elapsed();
871
872 self.timings
873 .write()
874 .entry(name.to_string())
875 .or_default()
876 .push(duration);
877
878 result
879 }
880
881 #[must_use]
883 pub fn get_average_time(&self, name: &str) -> Option<std::time::Duration> {
884 let timings = self.timings.read();
885 if let Some(times) = timings.get(name) {
886 if times.is_empty() {
887 None
888 } else {
889 let total: std::time::Duration = times.iter().sum();
890 Some(total / times.len() as u32)
891 }
892 } else {
893 None
894 }
895 }
896
897 pub fn clear(&self) {
899 self.timings.write().clear();
900 }
901}
902
903impl Default for PerformanceMonitor {
904 fn default() -> Self {
905 Self::new()
906 }
907}
908
909pub mod gpu {
911 use super::{
912 Arc, CandleResult, Device, IndexedParallelIterator, ParallelIterator, RwLock, Tensor,
913 };
914 use crate::EvaluationError;
915
916 #[derive(Clone)]
918 pub struct GpuAccelerator {
919 device: Device,
920 memory_pool: Arc<RwLock<Vec<Tensor>>>,
921 max_memory_mb: usize,
922 }
923
924 impl GpuAccelerator {
925 pub fn new() -> Result<Self, EvaluationError> {
927 let device = Self::detect_best_device()?;
928 let memory_pool = Arc::new(RwLock::new(Vec::new()));
929
930 Ok(Self {
931 device,
932 memory_pool,
933 max_memory_mb: 1024, })
935 }
936
937 #[must_use]
939 pub fn with_device(device: Device) -> Self {
940 Self {
941 device,
942 memory_pool: Arc::new(RwLock::new(Vec::new())),
943 max_memory_mb: 1024,
944 }
945 }
946
947 fn detect_best_device() -> Result<Device, EvaluationError> {
949 if let Ok(device) = Device::cuda_if_available(0) {
951 return Ok(device);
952 }
953
954 if let Ok(device) = Device::new_metal(0) {
956 return Ok(device);
957 }
958
959 Ok(Device::Cpu)
961 }
962
963 #[must_use]
965 pub fn device(&self) -> &Device {
966 &self.device
967 }
968
969 #[must_use]
971 pub fn is_gpu_available(&self) -> bool {
972 !matches!(self.device, Device::Cpu)
973 }
974
975 pub fn gpu_correlation(&self, x: &[f32], y: &[f32]) -> Result<f32, EvaluationError> {
977 if x.len() != y.len() || x.is_empty() {
978 return Ok(0.0);
979 }
980
981 let result = self.compute_correlation_tensor(x, y).map_err(|e| {
982 EvaluationError::MetricCalculationError {
983 metric: "Correlation".to_string(),
984 message: format!("GPU correlation failed: {e}"),
985 source: None,
986 }
987 })?;
988
989 Ok(result)
990 }
991
992 pub fn gpu_fft_batch(
994 &self,
995 signals: &[Vec<f32>],
996 ) -> Result<Vec<Vec<f32>>, EvaluationError> {
997 if signals.is_empty() {
998 return Ok(Vec::new());
999 }
1000
1001 let result = self.compute_fft_batch_tensor(signals).map_err(|e| {
1002 EvaluationError::MetricCalculationError {
1003 metric: "FFT".to_string(),
1004 message: format!("GPU FFT batch failed: {e}"),
1005 source: None,
1006 }
1007 })?;
1008
1009 Ok(result)
1010 }
1011
1012 pub fn gpu_spectral_analysis(
1014 &self,
1015 signal: &[f32],
1016 sample_rate: f32,
1017 ) -> Result<SpectralFeatures, EvaluationError> {
1018 let features = self
1019 .compute_spectral_features(signal, sample_rate)
1020 .map_err(|e| EvaluationError::MetricCalculationError {
1021 metric: "SpectralAnalysis".to_string(),
1022 message: format!("GPU spectral analysis failed: {e}"),
1023 source: None,
1024 })?;
1025
1026 Ok(features)
1027 }
1028
1029 pub fn gpu_mcd(
1031 &self,
1032 x_mfcc: &[Vec<f32>],
1033 y_mfcc: &[Vec<f32>],
1034 ) -> Result<f32, EvaluationError> {
1035 let mcd = self.compute_mcd_tensor(x_mfcc, y_mfcc).map_err(|e| {
1036 EvaluationError::MetricCalculationError {
1037 metric: "MCD".to_string(),
1038 message: format!("GPU MCD calculation failed: {e}"),
1039 source: None,
1040 }
1041 })?;
1042
1043 Ok(mcd)
1044 }
1045
1046 pub fn gpu_autocorrelation(
1048 &self,
1049 signal: &[f32],
1050 max_lag: usize,
1051 ) -> Result<Vec<f32>, EvaluationError> {
1052 let result = self
1053 .compute_autocorrelation_tensor(signal, max_lag)
1054 .map_err(|e| EvaluationError::MetricCalculationError {
1055 metric: "Autocorrelation".to_string(),
1056 message: format!("GPU autocorrelation failed: {e}"),
1057 source: None,
1058 })?;
1059
1060 Ok(result)
1061 }
1062
1063 fn compute_correlation_tensor(&self, x: &[f32], y: &[f32]) -> CandleResult<f32> {
1066 let x_tensor = Tensor::from_slice(x, x.len(), &self.device)?;
1067 let y_tensor = Tensor::from_slice(y, y.len(), &self.device)?;
1068
1069 let x_mean_val = x_tensor.mean_all()?.to_scalar::<f32>()?;
1071 let y_mean_val = y_tensor.mean_all()?.to_scalar::<f32>()?;
1072
1073 let x_centered = x_tensor
1075 .to_vec1::<f32>()?
1076 .iter()
1077 .map(|&v| v - x_mean_val)
1078 .collect::<Vec<_>>();
1079 let y_centered = y_tensor
1080 .to_vec1::<f32>()?
1081 .iter()
1082 .map(|&v| v - y_mean_val)
1083 .collect::<Vec<_>>();
1084
1085 let x_centered_tensor =
1086 Tensor::from_slice(&x_centered, x_centered.len(), &self.device)?;
1087 let y_centered_tensor =
1088 Tensor::from_slice(&y_centered, y_centered.len(), &self.device)?;
1089
1090 let numerator = x_centered_tensor.mul(&y_centered_tensor)?.sum_all()?;
1092 let x_sq_sum = x_centered_tensor.mul(&x_centered_tensor)?.sum_all()?;
1093 let y_sq_sum = y_centered_tensor.mul(&y_centered_tensor)?.sum_all()?;
1094
1095 let denominator = x_sq_sum.mul(&y_sq_sum)?.sqrt()?;
1096
1097 let correlation = if denominator.to_scalar::<f32>()? > f32::EPSILON {
1098 numerator.div(&denominator)?.to_scalar::<f32>()?
1099 } else {
1100 0.0
1101 };
1102
1103 Ok(correlation)
1104 }
1105
1106 fn compute_fft_batch_tensor(&self, signals: &[Vec<f32>]) -> CandleResult<Vec<Vec<f32>>> {
1107 let mut results = Vec::new();
1108
1109 for signal in signals {
1110 if signal.is_empty() {
1111 results.push(Vec::new());
1112 continue;
1113 }
1114
1115 let signal_tensor = Tensor::from_slice(signal, signal.len(), &self.device)?;
1116
1117 let spectrum = self.compute_magnitude_spectrum(&signal_tensor)?;
1120 results.push(spectrum);
1121 }
1122
1123 Ok(results)
1124 }
1125
1126 fn compute_magnitude_spectrum(&self, signal: &Tensor) -> CandleResult<Vec<f32>> {
1127 let signal_data = signal.to_vec1::<f32>()?;
1129 let spectrum_size = signal_data.len() / 2 + 1;
1130 let mut spectrum = vec![0.0; spectrum_size];
1131
1132 for (i, value) in spectrum.iter_mut().enumerate() {
1133 let frequency_ratio = i as f32 / spectrum_size as f32;
1134 *value = (1.0 - frequency_ratio) * signal_data.iter().map(|x| x.abs()).sum::<f32>()
1135 / signal_data.len() as f32;
1136 }
1137
1138 Ok(spectrum)
1139 }
1140
1141 fn compute_spectral_features(
1142 &self,
1143 signal: &[f32],
1144 sample_rate: f32,
1145 ) -> CandleResult<SpectralFeatures> {
1146 let signal_tensor = Tensor::from_slice(signal, signal.len(), &self.device)?;
1147
1148 let magnitude_spectrum = self.compute_magnitude_spectrum(&signal_tensor)?;
1150
1151 let mut weighted_sum = 0.0;
1153 let mut total_energy = 0.0;
1154
1155 for (i, &energy) in magnitude_spectrum.iter().enumerate() {
1156 let frequency = i as f32 * sample_rate / (2.0 * magnitude_spectrum.len() as f32);
1157 weighted_sum += frequency * energy;
1158 total_energy += energy;
1159 }
1160
1161 let centroid = if total_energy > 0.0 {
1162 weighted_sum / total_energy
1163 } else {
1164 0.0
1165 };
1166
1167 let mut cumulative_energy = 0.0;
1169 let target_energy = total_energy * 0.85;
1170 let mut rolloff = sample_rate / 2.0;
1171
1172 for (i, &energy) in magnitude_spectrum.iter().enumerate() {
1173 cumulative_energy += energy;
1174 if cumulative_energy >= target_energy {
1175 rolloff = i as f32 * sample_rate / (2.0 * magnitude_spectrum.len() as f32);
1176 break;
1177 }
1178 }
1179
1180 let mut spread_sum = 0.0;
1182 for (i, &energy) in magnitude_spectrum.iter().enumerate() {
1183 let frequency = i as f32 * sample_rate / (2.0 * magnitude_spectrum.len() as f32);
1184 spread_sum += (frequency - centroid).powi(2) * energy;
1185 }
1186
1187 let spread = if total_energy > 0.0 {
1188 (spread_sum / total_energy).sqrt()
1189 } else {
1190 0.0
1191 };
1192
1193 Ok(SpectralFeatures {
1194 centroid,
1195 spread,
1196 rolloff,
1197 flux: 0.0, energy: total_energy,
1199 })
1200 }
1201
1202 fn compute_mcd_tensor(
1203 &self,
1204 x_mfcc: &[Vec<f32>],
1205 y_mfcc: &[Vec<f32>],
1206 ) -> CandleResult<f32> {
1207 if x_mfcc.is_empty() || y_mfcc.is_empty() {
1208 return Ok(f32::INFINITY);
1209 }
1210
1211 let min_frames = x_mfcc.len().min(y_mfcc.len());
1212 let mut total_distance = 0.0;
1213 let mut valid_frames = 0;
1214
1215 for i in 0..min_frames {
1216 if x_mfcc[i].len() == y_mfcc[i].len() && !x_mfcc[i].is_empty() {
1217 let x_tensor = Tensor::from_slice(&x_mfcc[i], x_mfcc[i].len(), &self.device)?;
1218 let y_tensor = Tensor::from_slice(&y_mfcc[i], y_mfcc[i].len(), &self.device)?;
1219
1220 let x_ceps = x_tensor.narrow(0, 1, x_mfcc[i].len() - 1)?;
1222 let y_ceps = y_tensor.narrow(0, 1, y_mfcc[i].len() - 1)?;
1223
1224 let diff = x_ceps.sub(&y_ceps)?;
1225 let squared_diff = diff.mul(&diff)?;
1226 let sum_squared = squared_diff.sum_all()?;
1227
1228 let distance = sum_squared.sqrt()?.to_scalar::<f32>()?;
1229 total_distance += distance;
1230 valid_frames += 1;
1231 }
1232 }
1233
1234 if valid_frames > 0 {
1235 let mcd = (10.0 / std::f32::consts::LN_10) * (total_distance / valid_frames as f32);
1236 Ok(mcd)
1237 } else {
1238 Ok(f32::INFINITY)
1239 }
1240 }
1241
1242 fn compute_autocorrelation_tensor(
1243 &self,
1244 signal: &[f32],
1245 max_lag: usize,
1246 ) -> CandleResult<Vec<f32>> {
1247 if signal.len() < 2 || max_lag == 0 {
1248 return Ok(vec![0.0; max_lag + 1]);
1249 }
1250
1251 let signal_tensor = Tensor::from_slice(signal, signal.len(), &self.device)?;
1252 let effective_max_lag = max_lag.min(signal.len() - 1);
1253 let mut autocorr = vec![0.0; effective_max_lag + 1];
1254
1255 for lag in 0..=effective_max_lag {
1256 if lag < signal.len() {
1257 let signal1 = signal_tensor.narrow(0, 0, signal.len() - lag)?;
1258 let signal2 = signal_tensor.narrow(0, lag, signal.len() - lag)?;
1259
1260 let correlation = signal1.mul(&signal2)?.sum_all()?;
1261 let norm1 = signal1.mul(&signal1)?.sum_all()?.sqrt()?;
1262 let norm2 = signal2.mul(&signal2)?.sum_all()?.sqrt()?;
1263
1264 let denominator = norm1.mul(&norm2)?.to_scalar::<f32>()?;
1265 if denominator > 0.0 {
1266 autocorr[lag] = correlation.to_scalar::<f32>()? / denominator;
1267 }
1268 }
1269 }
1270
1271 Ok(autocorr)
1272 }
1273
1274 pub fn clear_memory_pool(&self) {
1276 self.memory_pool.write().clear();
1277 }
1278
1279 pub fn set_max_memory(&mut self, max_memory_mb: usize) {
1281 self.max_memory_mb = max_memory_mb;
1282 }
1283 }
1284
1285 impl Default for GpuAccelerator {
1286 fn default() -> Self {
1287 Self::new().unwrap_or_else(|_| Self::with_device(Device::Cpu))
1288 }
1289 }
1290
1291 #[derive(Debug, Clone)]
1293 pub struct SpectralFeatures {
1294 pub centroid: f32,
1296 pub spread: f32,
1298 pub rolloff: f32,
1300 pub flux: f32,
1302 pub energy: f32,
1304 }
1305
1306 pub struct GpuMemoryManager {
1308 device: Device,
1309 allocated_tensors: Arc<RwLock<Vec<Tensor>>>,
1310 max_cache_size: usize,
1311 }
1312
1313 impl GpuMemoryManager {
1314 #[must_use]
1320 pub fn new(device: Device, max_cache_size: usize) -> Self {
1321 Self {
1322 device,
1323 allocated_tensors: Arc::new(RwLock::new(Vec::new())),
1324 max_cache_size,
1325 }
1326 }
1327
1328 pub fn allocate_tensor(&self, shape: &[usize]) -> CandleResult<Tensor> {
1336 let tensor = Tensor::zeros(shape, candle_core::DType::F32, &self.device)?;
1337
1338 let mut cache = self.allocated_tensors.write();
1339 if cache.len() < self.max_cache_size {
1340 cache.push(tensor.clone());
1341 }
1342
1343 Ok(tensor)
1344 }
1345
1346 pub fn clear_cache(&self) {
1348 self.allocated_tensors.write().clear();
1349 }
1350 }
1351}
1352
1353pub mod multi_gpu {
1355 use super::gpu::{GpuAccelerator, SpectralFeatures};
1356 use super::*;
1357 use std::sync::atomic::{AtomicUsize, Ordering};
1358 use tokio::sync::Semaphore;
1359
1360 pub struct MultiGpuManager {
1362 accelerators: Vec<GpuAccelerator>,
1363 load_balancer: AtomicUsize,
1364 semaphore: Arc<Semaphore>,
1365 max_concurrent_ops: usize,
1366 }
1367
1368 impl MultiGpuManager {
1369 pub fn new(max_concurrent_ops: usize) -> Result<Self, crate::EvaluationError> {
1371 let accelerators = Self::detect_all_devices()?;
1372 let semaphore = Arc::new(Semaphore::new(max_concurrent_ops));
1373
1374 Ok(Self {
1375 accelerators,
1376 load_balancer: AtomicUsize::new(0),
1377 semaphore,
1378 max_concurrent_ops,
1379 })
1380 }
1381
1382 pub fn with_devices(devices: Vec<Device>, max_concurrent_ops: usize) -> Self {
1384 let accelerators = devices
1385 .into_iter()
1386 .map(GpuAccelerator::with_device)
1387 .collect();
1388 let semaphore = Arc::new(Semaphore::new(max_concurrent_ops));
1389
1390 Self {
1391 accelerators,
1392 load_balancer: AtomicUsize::new(0),
1393 semaphore,
1394 max_concurrent_ops,
1395 }
1396 }
1397
1398 #[must_use]
1400 pub fn device_count(&self) -> usize {
1401 self.accelerators.len()
1402 }
1403
1404 #[must_use]
1406 pub fn has_gpu_acceleration(&self) -> bool {
1407 self.accelerators.iter().any(|acc| acc.is_gpu_available())
1408 }
1409
1410 pub fn device_info(&self) -> Vec<String> {
1412 self.accelerators
1413 .iter()
1414 .enumerate()
1415 .map(|(idx, acc)| {
1416 format!(
1417 "Device {}: {} (GPU: {})",
1418 idx,
1419 match acc.device() {
1420 Device::Cpu => "CPU",
1421 Device::Cuda(_) => "CUDA",
1422 Device::Metal(_) => "Metal",
1423 },
1424 acc.is_gpu_available()
1425 )
1426 })
1427 .collect()
1428 }
1429
1430 pub async fn distributed_correlation_batch(
1432 &self,
1433 data_pairs: &[(Vec<f32>, Vec<f32>)],
1434 ) -> Result<Vec<f32>, crate::EvaluationError> {
1435 if data_pairs.is_empty() {
1436 return Ok(Vec::new());
1437 }
1438
1439 let chunk_size =
1441 (data_pairs.len() + self.accelerators.len() - 1) / self.accelerators.len();
1442 let chunks: Vec<_> = data_pairs.chunks(chunk_size).collect();
1443
1444 let mut handles = Vec::new();
1445
1446 for (device_idx, chunk) in chunks.into_iter().enumerate() {
1447 let device_idx = device_idx % self.accelerators.len();
1448 let accelerator = &self.accelerators[device_idx];
1449 let chunk_data = chunk.to_vec();
1450
1451 let permit = Arc::clone(&self.semaphore)
1452 .acquire_owned()
1453 .await
1454 .map_err(|_| crate::EvaluationError::MetricCalculationError {
1455 metric: "MultiGPU".to_string(),
1456 message: "Failed to acquire semaphore permit".to_string(),
1457 source: None,
1458 })?;
1459
1460 let accelerator_clone = accelerator.clone();
1461 let handle = tokio::spawn(async move {
1462 let _permit = permit;
1463 let mut results = Vec::new();
1464
1465 for (x, y) in chunk_data {
1466 match accelerator_clone.gpu_correlation(&x, &y) {
1467 Ok(corr) => results.push(corr),
1468 Err(e) => return Err(e),
1469 }
1470 }
1471
1472 Ok(results)
1473 });
1474
1475 handles.push(handle);
1476 }
1477
1478 let mut all_results = Vec::new();
1480 for handle in handles {
1481 let chunk_results = handle.await.map_err(|e| {
1482 crate::EvaluationError::MetricCalculationError {
1483 metric: "MultiGPU".to_string(),
1484 message: format!("GPU task failed: {e}"),
1485 source: None,
1486 }
1487 })??;
1488 all_results.extend(chunk_results);
1489 }
1490
1491 Ok(all_results)
1492 }
1493
1494 pub async fn distributed_fft_batch(
1496 &self,
1497 signals: &[Vec<f32>],
1498 ) -> Result<Vec<Vec<f32>>, crate::EvaluationError> {
1499 if signals.is_empty() {
1500 return Ok(Vec::new());
1501 }
1502
1503 let chunk_size =
1504 (signals.len() + self.accelerators.len() - 1) / self.accelerators.len();
1505 let chunks: Vec<_> = signals.chunks(chunk_size).collect();
1506
1507 let mut handles = Vec::new();
1508
1509 for (device_idx, chunk) in chunks.into_iter().enumerate() {
1510 let device_idx = device_idx % self.accelerators.len();
1511 let accelerator = &self.accelerators[device_idx];
1512 let chunk_data = chunk.to_vec();
1513
1514 let permit = Arc::clone(&self.semaphore)
1515 .acquire_owned()
1516 .await
1517 .map_err(|_| crate::EvaluationError::MetricCalculationError {
1518 metric: "MultiGPU".to_string(),
1519 message: "Failed to acquire semaphore permit".to_string(),
1520 source: None,
1521 })?;
1522
1523 let accelerator_clone = accelerator.clone();
1524 let handle = tokio::spawn(async move {
1525 let _permit = permit;
1526 accelerator_clone.gpu_fft_batch(&chunk_data)
1527 });
1528
1529 handles.push(handle);
1530 }
1531
1532 let mut all_results = Vec::new();
1534 for handle in handles {
1535 let chunk_results = handle.await.map_err(|e| {
1536 crate::EvaluationError::MetricCalculationError {
1537 metric: "MultiGPU".to_string(),
1538 message: format!("GPU FFT task failed: {e}"),
1539 source: None,
1540 }
1541 })??;
1542 all_results.extend(chunk_results);
1543 }
1544
1545 Ok(all_results)
1546 }
1547
1548 pub async fn distributed_spectral_analysis(
1550 &self,
1551 signals: &[Vec<f32>],
1552 sample_rate: f32,
1553 ) -> Result<Vec<SpectralFeatures>, crate::EvaluationError> {
1554 if signals.is_empty() {
1555 return Ok(Vec::new());
1556 }
1557
1558 let chunk_size =
1559 (signals.len() + self.accelerators.len() - 1) / self.accelerators.len();
1560 let chunks: Vec<_> = signals.chunks(chunk_size).collect();
1561
1562 let mut handles = Vec::new();
1563
1564 for (device_idx, chunk) in chunks.into_iter().enumerate() {
1565 let device_idx = device_idx % self.accelerators.len();
1566 let accelerator = &self.accelerators[device_idx];
1567 let chunk_data = chunk.to_vec();
1568
1569 let permit = Arc::clone(&self.semaphore)
1570 .acquire_owned()
1571 .await
1572 .map_err(|_| crate::EvaluationError::MetricCalculationError {
1573 metric: "MultiGPU".to_string(),
1574 message: "Failed to acquire semaphore permit".to_string(),
1575 source: None,
1576 })?;
1577
1578 let accelerator_clone = accelerator.clone();
1579 let handle = tokio::spawn(async move {
1580 let _permit = permit;
1581 let mut results = Vec::new();
1582
1583 for signal in chunk_data {
1584 match accelerator_clone.gpu_spectral_analysis(&signal, sample_rate) {
1585 Ok(features) => results.push(features),
1586 Err(e) => return Err(e),
1587 }
1588 }
1589
1590 Ok(results)
1591 });
1592
1593 handles.push(handle);
1594 }
1595
1596 let mut all_results = Vec::new();
1598 for handle in handles {
1599 let chunk_results = handle.await.map_err(|e| {
1600 crate::EvaluationError::MetricCalculationError {
1601 metric: "MultiGPU".to_string(),
1602 message: format!("GPU spectral analysis task failed: {e}"),
1603 source: None,
1604 }
1605 })??;
1606 all_results.extend(chunk_results);
1607 }
1608
1609 Ok(all_results)
1610 }
1611
1612 pub fn get_next_device(&self) -> &GpuAccelerator {
1614 let index =
1615 self.load_balancer.fetch_add(1, Ordering::Relaxed) % self.accelerators.len();
1616 &self.accelerators[index]
1617 }
1618
1619 pub fn clear_all_memory_pools(&self) {
1621 for accelerator in &self.accelerators {
1622 accelerator.clear_memory_pool();
1623 }
1624 }
1625
1626 pub fn set_max_memory_all(&mut self, max_memory_mb: usize) {
1628 for accelerator in &mut self.accelerators {
1629 accelerator.set_max_memory(max_memory_mb);
1630 }
1631 }
1632
1633 fn detect_all_devices() -> Result<Vec<GpuAccelerator>, crate::EvaluationError> {
1636 let mut accelerators = Vec::new();
1637
1638 for device_id in 0..8 {
1640 if let Ok(device) = Device::cuda_if_available(device_id) {
1641 accelerators.push(GpuAccelerator::with_device(device));
1642 } else {
1643 break;
1644 }
1645 }
1646
1647 for device_id in 0..4 {
1649 if let Ok(device) = Device::new_metal(device_id) {
1650 accelerators.push(GpuAccelerator::with_device(device));
1651 } else {
1652 break;
1653 }
1654 }
1655
1656 if accelerators.is_empty() {
1658 accelerators.push(GpuAccelerator::with_device(Device::Cpu));
1659 }
1660
1661 Ok(accelerators)
1662 }
1663 }
1664
1665 impl Clone for MultiGpuManager {
1666 fn clone(&self) -> Self {
1667 Self {
1668 accelerators: self.accelerators.clone(),
1669 load_balancer: AtomicUsize::new(self.load_balancer.load(Ordering::Relaxed)),
1670 semaphore: Arc::new(Semaphore::new(self.max_concurrent_ops)),
1671 max_concurrent_ops: self.max_concurrent_ops,
1672 }
1673 }
1674 }
1675
1676 #[derive(Debug, Clone)]
1678 pub struct MultiGpuMetrics {
1679 pub total_operations: usize,
1681 pub operations_per_device: Vec<usize>,
1683 pub avg_time_per_device: Vec<std::time::Duration>,
1685 pub memory_usage_per_device: Vec<u64>,
1687 pub throughput: f64,
1689 }
1690
1691 impl MultiGpuMetrics {
1692 #[must_use]
1694 pub fn new(device_count: usize) -> Self {
1695 Self {
1696 total_operations: 0,
1697 operations_per_device: vec![0; device_count],
1698 avg_time_per_device: vec![std::time::Duration::ZERO; device_count],
1699 memory_usage_per_device: vec![0; device_count],
1700 throughput: 0.0,
1701 }
1702 }
1703
1704 #[must_use]
1706 pub fn load_balance_efficiency(&self) -> f32 {
1707 if self.operations_per_device.is_empty() {
1708 return 1.0;
1709 }
1710
1711 let avg_ops = self.operations_per_device.iter().sum::<usize>() as f32
1712 / self.operations_per_device.len() as f32;
1713
1714 if avg_ops == 0.0 {
1715 return 1.0;
1716 }
1717
1718 let variance = self
1719 .operations_per_device
1720 .iter()
1721 .map(|&ops| {
1722 let diff = ops as f32 - avg_ops;
1723 diff * diff
1724 })
1725 .sum::<f32>()
1726 / self.operations_per_device.len() as f32;
1727
1728 let coefficient_of_variation = variance.sqrt() / avg_ops;
1729 (1.0 - coefficient_of_variation).max(0.0)
1730 }
1731
1732 #[must_use]
1734 pub fn fastest_device(&self) -> Option<usize> {
1735 self.avg_time_per_device
1736 .iter()
1737 .enumerate()
1738 .filter(|(_, &time)| time > std::time::Duration::ZERO)
1739 .min_by_key(|(_, &time)| time)
1740 .map(|(idx, _)| idx)
1741 }
1742 }
1743}
1744
1745#[cfg(test)]
1746mod tests {
1747 use super::*;
1748
1749 #[test]
1750 fn test_parallel_correlation() {
1751 let x = vec![1.0, 2.0, 3.0, 4.0, 5.0];
1752 let y = vec![1.0, 2.0, 3.0, 4.0, 5.0];
1753
1754 let corr = parallel_correlation(&x, &y);
1755 assert!((corr - 1.0).abs() < 0.001);
1756 }
1757
1758 #[test]
1759 fn test_lru_cache() {
1760 let cache = LRUCache::new(2);
1761
1762 cache.insert("key1", "value1");
1763 cache.insert("key2", "value2");
1764
1765 assert_eq!(cache.get(&"key1"), Some("value1"));
1766 assert_eq!(cache.get(&"key2"), Some("value2"));
1767
1768 cache.insert("key3", "value3");
1770 assert_eq!(cache.len(), 1);
1771 }
1772
1773 #[test]
1774 fn test_sliding_window_processor() {
1775 let processor = SlidingWindowProcessor::new(3, 1);
1776 let data = vec![1, 2, 3, 4, 5];
1777
1778 let results = processor.process_parallel(&data, |window| window.iter().sum::<i32>());
1779
1780 assert_eq!(results, vec![6, 9, 12]); }
1782
1783 #[test]
1784 fn test_simd_operations() {
1785 let a = vec![1.0, 2.0, 3.0, 4.0];
1786 let b = vec![2.0, 3.0, 4.0, 5.0];
1787
1788 let dot = simd::dot_product(&a, &b);
1789 assert_eq!(dot, 40.0); let product = simd::element_wise_multiply(&a, &b);
1792 assert_eq!(product, vec![2.0, 6.0, 12.0, 20.0]);
1793
1794 let rms = simd::rms(&a);
1795 assert!((rms - 2.738_613).abs() < 0.001);
1796 }
1797
1798 #[test]
1799 fn test_simd_vector_add() {
1800 let a = vec![1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0];
1801 let b = vec![1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0];
1802
1803 let result = simd::vector_add(&a, &b);
1804 assert_eq!(result, vec![2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]);
1805
1806 let empty_result = simd::vector_add(&[], &[]);
1808 assert_eq!(empty_result, Vec::<f32>::new());
1809
1810 let mismatch_result = simd::vector_add(&a, &[1.0]);
1812 assert_eq!(mismatch_result, Vec::<f32>::new());
1813 }
1814
1815 #[test]
1816 fn test_simd_vector_subtract() {
1817 let a = vec![5.0, 6.0, 7.0, 8.0, 9.0, 10.0, 11.0, 12.0];
1818 let b = vec![1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0];
1819
1820 let result = simd::vector_subtract(&a, &b);
1821 assert_eq!(result, vec![4.0, 4.0, 4.0, 4.0, 4.0, 4.0, 4.0, 4.0]);
1822
1823 let empty_result = simd::vector_subtract(&[], &[]);
1825 assert_eq!(empty_result, Vec::<f32>::new());
1826
1827 let mismatch_result = simd::vector_subtract(&a, &[1.0]);
1829 assert_eq!(mismatch_result, Vec::<f32>::new());
1830 }
1831
1832 #[test]
1833 fn test_simd_large_vectors() {
1834 let size = 2000;
1836 let a: Vec<f32> = (0..size).map(|i| i as f32).collect();
1837 let b: Vec<f32> = (0..size).map(|i| (i + 1) as f32).collect();
1838
1839 let dot = simd::dot_product(&a, &b);
1840 let expected_dot: f32 = a.iter().zip(&b).map(|(x, y)| x * y).sum();
1841 let tolerance = (expected_dot * 1e-5).max(1.0);
1843 assert!((dot - expected_dot).abs() < tolerance);
1844
1845 let product = simd::element_wise_multiply(&a, &b);
1846 let expected_product: Vec<f32> = a.iter().zip(&b).map(|(x, y)| x * y).collect();
1847 assert_eq!(product.len(), expected_product.len());
1848 for (actual, expected) in product.iter().zip(&expected_product) {
1849 let tolerance = (expected.abs() * 1e-5).max(1e-3);
1850 assert!((actual - expected).abs() < tolerance);
1851 }
1852 }
1853
1854 #[test]
1855 fn test_simd_spectral_centroid() {
1856 let spectrum = vec![0.0, 1.0, 2.0, 1.0, 0.0];
1857 let sample_rate = 16000.0;
1858
1859 let centroid = simd::spectral_centroid(&spectrum, sample_rate);
1860
1861 let frequencies: Vec<f32> = (0..spectrum.len())
1863 .map(|i| i as f32 * sample_rate / (2.0 * spectrum.len() as f32))
1864 .collect();
1865
1866 let weighted_sum: f32 = spectrum
1867 .iter()
1868 .zip(&frequencies)
1869 .map(|(mag, freq)| mag * freq)
1870 .sum();
1871 let magnitude_sum: f32 = spectrum.iter().sum();
1872 let expected = if magnitude_sum > 0.0 {
1873 weighted_sum / magnitude_sum
1874 } else {
1875 0.0
1876 };
1877
1878 assert!((centroid - expected).abs() < 0.001);
1879 }
1880
1881 #[test]
1882 fn test_simd_edge_cases() {
1883 let zeros = vec![0.0; 8];
1885 let ones = vec![1.0; 8];
1886
1887 assert_eq!(simd::dot_product(&zeros, &ones), 0.0);
1888 assert_eq!(simd::rms(&zeros), 0.0);
1889 assert_eq!(simd::spectral_centroid(&zeros, 16000.0), 0.0);
1890
1891 let single_a = vec![5.0];
1893 let single_b = vec![3.0];
1894
1895 assert_eq!(simd::dot_product(&single_a, &single_b), 15.0);
1896 assert_eq!(simd::vector_add(&single_a, &single_b), vec![8.0]);
1897 assert_eq!(simd::vector_subtract(&single_a, &single_b), vec![2.0]);
1898 }
1899
1900 #[test]
1901 fn test_simd_numerical_precision() {
1902 let a = vec![1e-6, 2e-6, 3e-6, 4e-6];
1904 let b = vec![1e-6, 2e-6, 3e-6, 4e-6];
1905
1906 let dot = simd::dot_product(&a, &b);
1907 let expected = 1e-12 + 4e-12 + 9e-12 + 16e-12; assert!((dot - expected).abs() < 1e-15);
1909
1910 let large_a = vec![1e6, 2e6, 3e6, 4e6];
1912 let large_b = vec![1e6, 2e6, 3e6, 4e6];
1913
1914 let large_dot = simd::dot_product(&large_a, &large_b);
1915 let large_expected = 1e12 + 4e12 + 9e12 + 16e12; assert!((large_dot - large_expected).abs() < 1e9);
1917 }
1918
1919 #[test]
1920 fn test_simd_performance_consistency() {
1921 let size = 100;
1923 let a: Vec<f32> = (0..size).map(|i| (i as f32 * 0.1).sin()).collect();
1924 let b: Vec<f32> = (0..size).map(|i| (i as f32 * 0.1).cos()).collect();
1925
1926 let simd_dot = simd::dot_product(&a, &b);
1927 let scalar_dot: f32 = a.iter().zip(&b).map(|(x, y)| x * y).sum();
1928
1929 assert!((simd_dot - scalar_dot).abs() < 1e-5);
1931
1932 let simd_product = simd::element_wise_multiply(&a, &b);
1933 let scalar_product: Vec<f32> = a.iter().zip(&b).map(|(x, y)| x * y).collect();
1934
1935 assert_eq!(simd_product.len(), scalar_product.len());
1936 for (simd_val, scalar_val) in simd_product.iter().zip(&scalar_product) {
1937 assert!((simd_val - scalar_val).abs() < 1e-6);
1938 }
1939 }
1940
1941 #[test]
1942 fn test_performance_monitor() {
1943 let monitor = PerformanceMonitor::new();
1944
1945 let result = monitor.time_operation("test_op", || {
1946 std::thread::sleep(std::time::Duration::from_millis(10));
1947 42
1948 });
1949
1950 assert_eq!(result, 42);
1951 let avg_time = monitor.get_average_time("test_op").unwrap();
1952 assert!(avg_time >= std::time::Duration::from_millis(10));
1953 }
1954
1955 #[test]
1956 fn test_gpu_accelerator_creation() {
1957 let accelerator = gpu::GpuAccelerator::default();
1958
1959 assert!(matches!(accelerator.device(), Device::Cpu) || accelerator.is_gpu_available());
1961 }
1962
1963 #[test]
1964 fn test_gpu_correlation() {
1965 let accelerator = gpu::GpuAccelerator::default();
1966 let x = vec![1.0, 2.0, 3.0, 4.0, 5.0];
1967 let y = vec![1.0, 2.0, 3.0, 4.0, 5.0];
1968
1969 let correlation = accelerator.gpu_correlation(&x, &y).unwrap();
1970 assert!((correlation - 1.0).abs() < 0.001);
1971 }
1972
1973 #[test]
1974 fn test_gpu_autocorrelation() {
1975 let accelerator = gpu::GpuAccelerator::default();
1976 let signal = vec![1.0, 0.8, 0.6, 0.4, 0.2];
1977
1978 let autocorr = accelerator.gpu_autocorrelation(&signal, 3).unwrap();
1979 assert_eq!(autocorr.len(), 4); assert!((autocorr[0] - 1.0).abs() < 0.001); }
1982
1983 #[test]
1984 fn test_gpu_spectral_analysis() {
1985 let accelerator = gpu::GpuAccelerator::default();
1986 let signal = vec![0.1; 1024];
1987 let sample_rate = 16000.0;
1988
1989 let features = accelerator
1990 .gpu_spectral_analysis(&signal, sample_rate)
1991 .unwrap();
1992 assert!(features.centroid >= 0.0);
1993 assert!(features.energy >= 0.0);
1994 assert!(features.rolloff >= 0.0);
1995 assert!(features.spread >= 0.0);
1996 }
1997
1998 #[test]
1999 fn test_gpu_memory_manager() {
2000 let device = Device::Cpu; let manager = gpu::GpuMemoryManager::new(device, 10);
2002
2003 let tensor = manager.allocate_tensor(&[10, 10]).unwrap();
2004 assert_eq!(tensor.shape().dims(), &[10, 10]);
2005
2006 manager.clear_cache();
2007 }
2008
2009 #[test]
2010 fn test_persistent_cache() {
2011 use tempfile::TempDir;
2012
2013 let temp_dir = TempDir::new().unwrap();
2014 let cache_dir = temp_dir.path().join("test_cache");
2015
2016 let cache = PersistentCache::new(&cache_dir, 10, 6).unwrap();
2017
2018 assert!(cache
2020 .insert("key1".to_string(), "value1".to_string())
2021 .is_ok());
2022 assert_eq!(cache.get(&"key1".to_string()), Some("value1".to_string()));
2023
2024 let cache2 = PersistentCache::new(&cache_dir, 10, 6).unwrap();
2026 assert_eq!(cache2.get(&"key1".to_string()), Some("value1".to_string()));
2027
2028 let stats = cache.stats();
2030 assert!(stats.disk_entries > 0);
2031 assert!(stats.cache_dir_size > 0);
2032
2033 assert!(cache.clear().is_ok());
2035 assert!(cache.is_empty());
2036 }
2037
2038 #[test]
2039 fn test_persistent_cache_compression() {
2040 use tempfile::TempDir;
2041
2042 let temp_dir = TempDir::new().unwrap();
2043 let cache_dir = temp_dir.path().join("test_compression");
2044
2045 let mut cache = PersistentCache::new(&cache_dir, 10, 1).unwrap();
2046
2047 cache.set_compression_level(9);
2049
2050 let large_value = "x".repeat(1000);
2052 assert!(cache
2053 .insert("large_key".to_string(), large_value.clone())
2054 .is_ok());
2055 assert_eq!(cache.get(&"large_key".to_string()), Some(large_value));
2056
2057 let stats = cache.stats();
2059 assert!(stats.cache_dir_size < 1000); }
2061
2062 #[test]
2063 fn test_persistent_cache_memory_fallback() {
2064 use tempfile::TempDir;
2065
2066 let temp_dir = TempDir::new().unwrap();
2067 let cache_dir = temp_dir.path().join("test_memory");
2068
2069 let cache = PersistentCache::new(&cache_dir, 2, 6).unwrap();
2070
2071 assert!(cache
2073 .insert("key1".to_string(), "value1".to_string())
2074 .is_ok());
2075 assert!(cache
2076 .insert("key2".to_string(), "value2".to_string())
2077 .is_ok());
2078
2079 assert!(cache
2081 .insert("key3".to_string(), "value3".to_string())
2082 .is_ok());
2083
2084 assert_eq!(cache.get(&"key1".to_string()), Some("value1".to_string()));
2086 assert_eq!(cache.get(&"key2".to_string()), Some("value2".to_string()));
2087 assert_eq!(cache.get(&"key3".to_string()), Some("value3".to_string()));
2088
2089 let stats = cache.stats();
2090 assert_eq!(stats.disk_entries, 3);
2091 }
2092
2093 #[test]
2094 fn test_multi_gpu_manager_creation() {
2095 use multi_gpu::MultiGpuManager;
2096
2097 let manager = MultiGpuManager::new(4).unwrap();
2098
2099 assert!(manager.device_count() >= 1);
2101
2102 let info = manager.device_info();
2104 assert!(!info.is_empty());
2105 assert!(!info[0].is_empty());
2106 }
2107
2108 #[tokio::test]
2109 async fn test_multi_gpu_distributed_correlation() {
2110 use multi_gpu::MultiGpuManager;
2111
2112 let manager = MultiGpuManager::new(2).unwrap();
2113
2114 let data_pairs = vec![
2115 (vec![1.0, 2.0, 3.0], vec![1.0, 2.0, 3.0]),
2116 (vec![4.0, 5.0, 6.0], vec![4.0, 5.0, 6.0]),
2117 (vec![1.0, 0.0, -1.0], vec![-1.0, 0.0, 1.0]),
2118 ];
2119
2120 let correlations = manager
2121 .distributed_correlation_batch(&data_pairs)
2122 .await
2123 .unwrap();
2124
2125 assert_eq!(correlations.len(), 3);
2126 assert!((correlations[0] - 1.0).abs() < 0.001); assert!((correlations[1] - 1.0).abs() < 0.001); assert!((correlations[2] + 1.0).abs() < 0.001); }
2130
2131 #[tokio::test]
2132 async fn test_multi_gpu_distributed_fft() {
2133 use multi_gpu::MultiGpuManager;
2134
2135 let manager = MultiGpuManager::new(2).unwrap();
2136
2137 let signals = vec![vec![1.0, 0.0, -1.0, 0.0], vec![0.5, 1.0, 0.5, 0.0]];
2138
2139 let spectra = manager.distributed_fft_batch(&signals).await.unwrap();
2140
2141 assert_eq!(spectra.len(), 2);
2142 assert!(!spectra[0].is_empty());
2143 assert!(!spectra[1].is_empty());
2144 }
2145
2146 #[tokio::test]
2147 async fn test_multi_gpu_distributed_spectral_analysis() {
2148 use multi_gpu::MultiGpuManager;
2149
2150 let manager = MultiGpuManager::new(2).unwrap();
2151
2152 let signals = vec![vec![0.1; 512], vec![0.2; 512]];
2153 let sample_rate = 16000.0;
2154
2155 let features = manager
2156 .distributed_spectral_analysis(&signals, sample_rate)
2157 .await
2158 .unwrap();
2159
2160 assert_eq!(features.len(), 2);
2161 for feature in &features {
2162 assert!(feature.centroid >= 0.0);
2163 assert!(feature.energy >= 0.0);
2164 assert!(feature.rolloff >= 0.0);
2165 }
2166 }
2167
2168 #[test]
2169 fn test_multi_gpu_metrics() {
2170 use multi_gpu::MultiGpuMetrics;
2171
2172 let mut metrics = MultiGpuMetrics::new(3);
2173 metrics.operations_per_device = vec![10, 10, 10]; let efficiency = metrics.load_balance_efficiency();
2176 assert!((efficiency - 1.0).abs() < 0.1); metrics.operations_per_device = vec![20, 5, 5]; let efficiency = metrics.load_balance_efficiency();
2181 assert!(efficiency < 1.0); }
2183
2184 #[test]
2185 fn test_multi_gpu_load_balancing() {
2186 use multi_gpu::MultiGpuManager;
2187
2188 let devices = vec![Device::Cpu, Device::Cpu]; let manager = MultiGpuManager::with_devices(devices, 4);
2190
2191 let device1 = manager.get_next_device();
2193 let device2 = manager.get_next_device();
2194 let device3 = manager.get_next_device(); assert!(std::ptr::eq(device1, device3)); }
2198}