Skip to main content

voirs_evaluation/
performance.rs

1//! Performance optimization utilities
2//!
3//! This module provides performance optimizations including:
4//! - Parallel processing utilities
5//! - Memory optimization helpers
6//! - SIMD-accelerated computations
7//! - GPU acceleration framework
8//! - Caching mechanisms
9
10use parking_lot::RwLock;
11use scirs2_core::parallel_ops::*;
12use std::collections::HashMap;
13use std::hash::Hash;
14use std::path::Path;
15use std::sync::Arc;
16
17// GPU acceleration imports
18use candle_core::{Device, Result as CandleResult, Tensor};
19
20// Re-export GPU module
21pub use gpu::{GpuAccelerator, GpuMemoryManager, SpectralFeatures as GpuSpectralFeatures};
22
23/// Memory-efficient chunked processing for large audio arrays
24pub fn process_audio_chunks<T, F, R>(data: &[T], chunk_size: usize, processor: F) -> Vec<R>
25where
26    T: Send + Sync,
27    F: Fn(&[T]) -> R + Send + Sync,
28    R: Send,
29{
30    if data.len() <= chunk_size {
31        return vec![processor(data)];
32    }
33
34    data.par_chunks(chunk_size).map(processor).collect()
35}
36
37/// Parallel correlation calculation with SIMD optimization
38#[must_use]
39pub fn parallel_correlation(x: &[f32], y: &[f32]) -> f32 {
40    if x.len() != y.len() || x.is_empty() {
41        return 0.0;
42    }
43
44    // Use parallel reduction for large arrays
45    if x.len() > 1000 {
46        let (sum_xy, sum_x, sum_y, sum_x2, sum_y2) = x
47            .par_iter()
48            .zip(y.par_iter())
49            .map(|(&xi, &yi)| (xi * yi, xi, yi, xi * xi, yi * yi))
50            .reduce(
51                || (0.0, 0.0, 0.0, 0.0, 0.0),
52                |acc, item| {
53                    (
54                        acc.0 + item.0,
55                        acc.1 + item.1,
56                        acc.2 + item.2,
57                        acc.3 + item.3,
58                        acc.4 + item.4,
59                    )
60                },
61            );
62
63        let n = x.len() as f32;
64        let numerator = n * sum_xy - sum_x * sum_y;
65        let denominator = ((n * sum_x2 - sum_x * sum_x) * (n * sum_y2 - sum_y * sum_y)).sqrt();
66
67        if denominator > f32::EPSILON {
68            numerator / denominator
69        } else {
70            0.0
71        }
72    } else {
73        // Use sequential for small arrays
74        crate::calculate_correlation(x, y)
75    }
76}
77
78/// Parallel FFT computation for batch processing
79#[must_use]
80pub fn parallel_fft_batch(signals: &[Vec<f32>]) -> Vec<Vec<f32>> {
81    use scirs2_fft::{RealFftPlanner, RealToComplex};
82    use std::sync::Mutex;
83
84    let planner = Arc::new(Mutex::new(RealFftPlanner::<f32>::new()));
85
86    signals
87        .par_iter()
88        .map(|signal| {
89            if signal.is_empty() {
90                return Vec::new();
91            }
92
93            let mut planner_guard = planner.lock().unwrap();
94            let fft = planner_guard.plan_fft_forward(signal.len());
95            drop(planner_guard);
96
97            let mut indata = signal.clone();
98            let mut spectrum = vec![scirs2_core::Complex::new(0.0, 0.0); fft.output_len()];
99
100            fft.process(&indata, &mut spectrum);
101
102            // Convert to magnitude spectrum
103            spectrum.iter().map(|c| c.norm()).collect()
104        })
105        .collect()
106}
107
108/// High-performance autocorrelation with parallel computation
109#[must_use]
110pub fn parallel_autocorrelation(signal: &[f32], max_lag: usize) -> Vec<f32> {
111    if signal.len() < 2 || max_lag == 0 {
112        return vec![0.0; max_lag + 1];
113    }
114
115    let effective_max_lag = max_lag.min(signal.len() - 1);
116
117    (0..=effective_max_lag)
118        .into_par_iter()
119        .map(|lag| {
120            let mut correlation = 0.0;
121            let mut norm1 = 0.0;
122            let mut norm2 = 0.0;
123
124            for i in 0..(signal.len() - lag) {
125                correlation += signal[i] * signal[i + lag];
126                norm1 += signal[i] * signal[i];
127                if lag == 0 {
128                    norm2 = norm1;
129                } else {
130                    norm2 += signal[i + lag] * signal[i + lag];
131                }
132            }
133
134            if norm1 > 0.0 && norm2 > 0.0 {
135                correlation / (norm1 * norm2).sqrt()
136            } else {
137                0.0
138            }
139        })
140        .collect()
141}
142
143/// Thread-safe LRU cache for expensive computations
144pub struct LRUCache<K, V> {
145    map: Arc<RwLock<HashMap<K, V>>>,
146    max_size: usize,
147}
148
149impl<K, V> LRUCache<K, V>
150where
151    K: Eq + Hash + Clone,
152    V: Clone,
153{
154    /// Create a new LRU cache with the specified maximum size
155    #[must_use]
156    pub fn new(max_size: usize) -> Self {
157        Self {
158            map: Arc::new(RwLock::new(HashMap::new())),
159            max_size,
160        }
161    }
162
163    /// Get a value from the cache
164    pub fn get(&self, key: &K) -> Option<V> {
165        self.map.read().get(key).cloned()
166    }
167
168    /// Insert a key-value pair into the cache
169    pub fn insert(&self, key: K, value: V) {
170        let mut map = self.map.write();
171
172        // Simple eviction strategy - clear if at capacity
173        if map.len() >= self.max_size {
174            map.clear();
175        }
176
177        map.insert(key, value);
178    }
179
180    /// Clear all entries from the cache
181    pub fn clear(&self) {
182        self.map.write().clear();
183    }
184
185    /// Get the current number of entries in the cache
186    #[must_use]
187    pub fn len(&self) -> usize {
188        self.map.read().len()
189    }
190
191    /// Check if the cache is empty
192    #[must_use]
193    pub fn is_empty(&self) -> bool {
194        self.map.read().is_empty()
195    }
196}
197
198/// Persistent cache with compression support
199///
200/// This cache stores data to disk with compression for efficient storage
201/// and retrieval across application restarts.
202pub struct PersistentCache<K, V> {
203    memory_cache: LRUCache<K, V>,
204    cache_dir: std::path::PathBuf,
205    compression_level: u32,
206}
207
208impl<K, V> PersistentCache<K, V>
209where
210    K: Eq + Hash + Clone + serde::Serialize + serde::de::DeserializeOwned,
211    V: Clone + serde::Serialize + serde::de::DeserializeOwned,
212{
213    /// Create a new persistent cache with the specified directory and settings
214    pub fn new<P: AsRef<Path>>(
215        cache_dir: P,
216        max_memory_size: usize,
217        compression_level: u32,
218    ) -> Result<Self, std::io::Error> {
219        let cache_dir = cache_dir.as_ref().to_path_buf();
220
221        // Create cache directory if it doesn't exist
222        if !cache_dir.exists() {
223            std::fs::create_dir_all(&cache_dir)?;
224        }
225
226        Ok(Self {
227            memory_cache: LRUCache::new(max_memory_size),
228            cache_dir,
229            compression_level: compression_level.clamp(0, 9),
230        })
231    }
232
233    /// Get a value from the cache (checks memory first, then disk)
234    pub fn get(&self, key: &K) -> Option<V> {
235        // Check memory cache first
236        if let Some(value) = self.memory_cache.get(key) {
237            return Some(value);
238        }
239
240        // Try to load from disk
241        if let Ok(value) = self.load_from_disk(key) {
242            // Cache in memory for future access
243            self.memory_cache.insert(key.clone(), value.clone());
244            return Some(value);
245        }
246
247        None
248    }
249
250    /// Insert a key-value pair into the cache (saves to both memory and disk)
251    pub fn insert(&self, key: K, value: V) -> Result<(), std::io::Error> {
252        // Save to memory cache
253        self.memory_cache.insert(key.clone(), value.clone());
254
255        // Save to disk with compression
256        self.save_to_disk(&key, &value)
257    }
258
259    /// Clear all entries from the cache
260    pub fn clear(&self) -> Result<(), std::io::Error> {
261        self.memory_cache.clear();
262
263        // Remove all files in cache directory
264        for entry in std::fs::read_dir(&self.cache_dir)? {
265            let entry = entry?;
266            if entry.path().is_file() {
267                std::fs::remove_file(entry.path())?;
268            }
269        }
270
271        Ok(())
272    }
273
274    /// Get the current number of entries in memory cache
275    #[must_use]
276    pub fn memory_len(&self) -> usize {
277        self.memory_cache.len()
278    }
279
280    /// Get the total number of entries (including disk)
281    pub fn total_len(&self) -> usize {
282        std::fs::read_dir(&self.cache_dir)
283            .map(|entries| entries.filter_map(|e| e.ok()).count())
284            .unwrap_or(0)
285    }
286
287    /// Check if the cache is empty
288    #[must_use]
289    pub fn is_empty(&self) -> bool {
290        self.memory_cache.is_empty() && self.total_len() == 0
291    }
292
293    /// Get cache statistics
294    pub fn stats(&self) -> CacheStats {
295        CacheStats {
296            memory_entries: self.memory_cache.len(),
297            disk_entries: self.total_len(),
298            cache_dir_size: self.calculate_cache_dir_size(),
299        }
300    }
301
302    /// Set compression level (0-9, where 9 is highest compression)
303    pub fn set_compression_level(&mut self, level: u32) {
304        self.compression_level = level.clamp(0, 9);
305    }
306
307    // Private helper methods
308
309    fn cache_key_to_filename(&self, key: &K) -> Result<String, std::io::Error> {
310        let serialized = bincode::serde::encode_to_vec(key, bincode::config::standard())
311            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
312        let hash = {
313            use std::collections::hash_map::DefaultHasher;
314            use std::hash::Hasher;
315            let mut hasher = DefaultHasher::new();
316            hasher.write(&serialized);
317            hasher.finish()
318        };
319        Ok(format!("{:x}.cache", hash))
320    }
321
322    fn save_to_disk(&self, key: &K, value: &V) -> Result<(), std::io::Error> {
323        let filename = self.cache_key_to_filename(key)?;
324        let filepath = self.cache_dir.join(filename);
325
326        let serialized = bincode::serde::encode_to_vec(value, bincode::config::standard())
327            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
328
329        // Compress the data using flate2
330        let mut encoder = flate2::write::GzEncoder::new(
331            Vec::new(),
332            flate2::Compression::new(self.compression_level),
333        );
334        std::io::Write::write_all(&mut encoder, &serialized)?;
335        let compressed = encoder.finish()?;
336
337        std::fs::write(filepath, compressed)
338    }
339
340    fn load_from_disk(&self, key: &K) -> Result<V, std::io::Error> {
341        let filename = self.cache_key_to_filename(key)?;
342        let filepath = self.cache_dir.join(filename);
343
344        if !filepath.exists() {
345            return Err(std::io::Error::new(
346                std::io::ErrorKind::NotFound,
347                "Cache entry not found",
348            ));
349        }
350
351        let compressed_data = std::fs::read(filepath)?;
352
353        // Decompress the data
354        let mut decoder = flate2::read::GzDecoder::new(&compressed_data[..]);
355        let mut decompressed = Vec::new();
356        std::io::Read::read_to_end(&mut decoder, &mut decompressed)?;
357
358        bincode::serde::decode_from_slice(&decompressed, bincode::config::standard())
359            .map(|(v, _)| v)
360            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))
361    }
362
363    fn calculate_cache_dir_size(&self) -> u64 {
364        std::fs::read_dir(&self.cache_dir)
365            .map(|entries| {
366                entries
367                    .filter_map(|entry| entry.ok().and_then(|e| e.metadata().ok()).map(|m| m.len()))
368                    .sum()
369            })
370            .unwrap_or(0)
371    }
372}
373
374/// Cache statistics
375#[derive(Debug, Clone)]
376pub struct CacheStats {
377    /// Number of entries in memory cache
378    pub memory_entries: usize,
379    /// Number of entries on disk
380    pub disk_entries: usize,
381    /// Total size of cache directory in bytes
382    pub cache_dir_size: u64,
383}
384
385/// Memory-efficient sliding window processor
386pub struct SlidingWindowProcessor<T> {
387    window_size: usize,
388    hop_size: usize,
389    buffer: Vec<T>,
390}
391
392impl<T> SlidingWindowProcessor<T>
393where
394    T: Clone + Default,
395{
396    /// Create a new sliding window processor with specified window and hop sizes
397    #[must_use]
398    pub fn new(window_size: usize, hop_size: usize) -> Self {
399        Self {
400            window_size,
401            hop_size,
402            buffer: Vec::with_capacity(window_size),
403        }
404    }
405
406    /// Process data in sliding windows with parallel computation
407    pub fn process_parallel<F, R>(&self, data: &[T], processor: F) -> Vec<R>
408    where
409        T: Send + Sync,
410        F: Fn(&[T]) -> R + Send + Sync,
411        R: Send,
412    {
413        if data.len() < self.window_size {
414            return Vec::new();
415        }
416
417        let num_windows = (data.len() - self.window_size) / self.hop_size + 1;
418
419        (0..num_windows)
420            .into_par_iter()
421            .map(|i| {
422                let start = i * self.hop_size;
423                let end = (start + self.window_size).min(data.len());
424                processor(&data[start..end])
425            })
426            .collect()
427    }
428}
429
430/// SIMD-accelerated vector operations using hardware-specific instructions
431pub mod simd {
432    use scirs2_core::parallel_ops::*;
433
434    /// SIMD dot product with hardware acceleration when available
435    #[must_use]
436    pub fn dot_product(a: &[f32], b: &[f32]) -> f32 {
437        if a.len() != b.len() {
438            return 0.0;
439        }
440
441        // Use hardware SIMD for smaller arrays, parallel processing for larger ones
442        if a.len() > 1000 {
443            a.par_iter().zip(b.par_iter()).map(|(&x, &y)| x * y).sum()
444        } else {
445            dot_product_simd(a, b)
446        }
447    }
448
449    /// Element-wise multiplication with SIMD optimization
450    #[must_use]
451    pub fn element_wise_multiply(a: &[f32], b: &[f32]) -> Vec<f32> {
452        if a.len() != b.len() {
453            return Vec::new();
454        }
455
456        if a.len() > 1000 {
457            a.par_iter()
458                .zip(b.par_iter())
459                .map(|(&x, &y)| x * y)
460                .collect()
461        } else {
462            element_wise_multiply_simd(a, b)
463        }
464    }
465
466    /// SIMD-optimized RMS calculation
467    #[must_use]
468    pub fn rms(signal: &[f32]) -> f32 {
469        if signal.is_empty() {
470            return 0.0;
471        }
472
473        let sum_squares = if signal.len() > 1000 {
474            signal.par_iter().map(|&x| x * x).sum::<f32>()
475        } else {
476            rms_simd(signal)
477        };
478
479        (sum_squares / signal.len() as f32).sqrt()
480    }
481
482    /// Spectral centroid with SIMD-optimized frequency weighting
483    #[must_use]
484    pub fn spectral_centroid(spectrum: &[f32], sample_rate: f32) -> f32 {
485        if spectrum.is_empty() {
486            return 0.0;
487        }
488
489        let (weighted_sum, magnitude_sum) = if spectrum.len() > 500 {
490            spectrum
491                .par_iter()
492                .enumerate()
493                .map(|(i, &magnitude)| {
494                    let frequency = i as f32 * sample_rate / (2.0 * spectrum.len() as f32);
495                    (frequency * magnitude, magnitude)
496                })
497                .reduce(|| (0.0, 0.0), |acc, item| (acc.0 + item.0, acc.1 + item.1))
498        } else {
499            spectral_centroid_simd(spectrum, sample_rate)
500        };
501
502        if magnitude_sum > 0.0 {
503            weighted_sum / magnitude_sum
504        } else {
505            0.0
506        }
507    }
508
509    /// Vector addition with SIMD optimization
510    #[must_use]
511    pub fn vector_add(a: &[f32], b: &[f32]) -> Vec<f32> {
512        if a.len() != b.len() {
513            return Vec::new();
514        }
515
516        if a.len() > 1000 {
517            a.par_iter()
518                .zip(b.par_iter())
519                .map(|(&x, &y)| x + y)
520                .collect()
521        } else {
522            vector_add_simd(a, b)
523        }
524    }
525
526    /// Vector subtraction with SIMD optimization
527    #[must_use]
528    pub fn vector_subtract(a: &[f32], b: &[f32]) -> Vec<f32> {
529        if a.len() != b.len() {
530            return Vec::new();
531        }
532
533        if a.len() > 1000 {
534            a.par_iter()
535                .zip(b.par_iter())
536                .map(|(&x, &y)| x - y)
537                .collect()
538        } else {
539            vector_subtract_simd(a, b)
540        }
541    }
542
543    /// x86 SSE/AVX optimized implementations
544    #[cfg(target_arch = "x86_64")]
545    mod x86_impl {
546        #[cfg(target_feature = "avx")]
547        pub fn dot_product_avx(a: &[f32], b: &[f32]) -> f32 {
548            use std::arch::x86_64::*;
549
550            let len = a.len();
551            let chunks = len / 8;
552            let mut sum = unsafe { _mm256_setzero_ps() };
553
554            unsafe {
555                for i in 0..chunks {
556                    let a_vec = _mm256_loadu_ps(a.as_ptr().add(i * 8));
557                    let b_vec = _mm256_loadu_ps(b.as_ptr().add(i * 8));
558                    let mul = _mm256_mul_ps(a_vec, b_vec);
559                    sum = _mm256_add_ps(sum, mul);
560                }
561
562                // Horizontal sum
563                let sum_low = _mm256_extractf128_ps(sum, 0);
564                let sum_high = _mm256_extractf128_ps(sum, 1);
565                let sum_128 = _mm_add_ps(sum_low, sum_high);
566
567                let sum_64 = _mm_add_ps(sum_128, _mm_movehl_ps(sum_128, sum_128));
568                let sum_32 = _mm_add_ss(sum_64, _mm_shuffle_ps(sum_64, sum_64, 1));
569
570                let mut result = _mm_cvtss_f32(sum_32);
571
572                // Handle remaining elements
573                for i in (chunks * 8)..len {
574                    result += a[i] * b[i];
575                }
576
577                result
578            }
579        }
580
581        #[cfg(target_feature = "sse")]
582        pub fn dot_product_sse(a: &[f32], b: &[f32]) -> f32 {
583            use std::arch::x86_64::*;
584
585            let len = a.len();
586            let chunks = len / 4;
587            let mut sum = unsafe { _mm_setzero_ps() };
588
589            unsafe {
590                for i in 0..chunks {
591                    let a_vec = _mm_loadu_ps(a.as_ptr().add(i * 4));
592                    let b_vec = _mm_loadu_ps(b.as_ptr().add(i * 4));
593                    let mul = _mm_mul_ps(a_vec, b_vec);
594                    sum = _mm_add_ps(sum, mul);
595                }
596
597                // Horizontal sum
598                let sum_64 = _mm_add_ps(sum, _mm_movehl_ps(sum, sum));
599                let sum_32 = _mm_add_ss(sum_64, _mm_shuffle_ps(sum_64, sum_64, 1));
600
601                let mut result = _mm_cvtss_f32(sum_32);
602
603                // Handle remaining elements
604                for i in (chunks * 4)..len {
605                    result += a[i] * b[i];
606                }
607
608                result
609            }
610        }
611
612        #[cfg(target_feature = "avx")]
613        pub fn element_wise_multiply_avx(a: &[f32], b: &[f32]) -> Vec<f32> {
614            use std::arch::x86_64::*;
615
616            let len = a.len();
617            let chunks = len / 8;
618            let mut result: Vec<f32> = Vec::with_capacity(len);
619
620            unsafe {
621                for i in 0..chunks {
622                    let a_vec = _mm256_loadu_ps(a.as_ptr().add(i * 8));
623                    let b_vec = _mm256_loadu_ps(b.as_ptr().add(i * 8));
624                    let mul = _mm256_mul_ps(a_vec, b_vec);
625
626                    let ptr = result.as_mut_ptr().add(i * 8);
627                    _mm256_storeu_ps(ptr, mul);
628                }
629
630                result.set_len(chunks * 8);
631
632                // Handle remaining elements
633                for i in (chunks * 8)..len {
634                    result.push(a[i] * b[i]);
635                }
636            }
637
638            result
639        }
640    }
641
642    /// ARM NEON optimized implementations
643    #[cfg(target_arch = "aarch64")]
644    mod arm_impl {
645        #[cfg(target_feature = "neon")]
646        pub fn dot_product_neon(a: &[f32], b: &[f32]) -> f32 {
647            use std::arch::aarch64::*;
648
649            let len = a.len();
650            let chunks = len / 4;
651            let mut sum = unsafe { vdupq_n_f32(0.0) };
652
653            unsafe {
654                for i in 0..chunks {
655                    let a_vec = vld1q_f32(a.as_ptr().add(i * 4));
656                    let b_vec = vld1q_f32(b.as_ptr().add(i * 4));
657                    let mul = vmulq_f32(a_vec, b_vec);
658                    sum = vaddq_f32(sum, mul);
659                }
660
661                // Horizontal sum
662                let sum_pair = vadd_f32(vget_high_f32(sum), vget_low_f32(sum));
663                let result_vec = vpadd_f32(sum_pair, sum_pair);
664                let mut result = vget_lane_f32(result_vec, 0);
665
666                // Handle remaining elements
667                for i in (chunks * 4)..len {
668                    result += a[i] * b[i];
669                }
670
671                result
672            }
673        }
674
675        #[cfg(target_feature = "neon")]
676        pub fn element_wise_multiply_neon(a: &[f32], b: &[f32]) -> Vec<f32> {
677            use std::arch::aarch64::*;
678
679            let len = a.len();
680            let chunks = len / 4;
681            let mut result: Vec<f32> = Vec::with_capacity(len);
682
683            unsafe {
684                for i in 0..chunks {
685                    let a_vec = vld1q_f32(a.as_ptr().add(i * 4));
686                    let b_vec = vld1q_f32(b.as_ptr().add(i * 4));
687                    let mul = vmulq_f32(a_vec, b_vec);
688
689                    vst1q_f32(result.as_mut_ptr().add(i * 4), mul);
690                }
691
692                result.set_len(chunks * 4);
693
694                // Handle remaining elements
695                for i in (chunks * 4)..len {
696                    result.push(a[i] * b[i]);
697                }
698            }
699
700            result
701        }
702    }
703
704    /// Platform-specific SIMD dispatch
705    #[inline]
706    fn dot_product_simd(a: &[f32], b: &[f32]) -> f32 {
707        #[cfg(all(target_arch = "x86_64", target_feature = "avx"))]
708        {
709            x86_impl::dot_product_avx(a, b)
710        }
711        #[cfg(all(
712            target_arch = "x86_64",
713            target_feature = "sse",
714            not(target_feature = "avx")
715        ))]
716        {
717            x86_impl::dot_product_sse(a, b)
718        }
719        #[cfg(all(target_arch = "aarch64", target_feature = "neon"))]
720        {
721            arm_impl::dot_product_neon(a, b)
722        }
723        #[cfg(not(any(
724            all(
725                target_arch = "x86_64",
726                any(target_feature = "avx", target_feature = "sse")
727            ),
728            all(target_arch = "aarch64", target_feature = "neon")
729        )))]
730        {
731            // Fallback to scalar implementation
732            a.iter().zip(b.iter()).map(|(&x, &y)| x * y).sum()
733        }
734    }
735
736    #[inline]
737    fn element_wise_multiply_simd(a: &[f32], b: &[f32]) -> Vec<f32> {
738        #[cfg(all(target_arch = "x86_64", target_feature = "avx"))]
739        {
740            x86_impl::element_wise_multiply_avx(a, b)
741        }
742        #[cfg(all(target_arch = "aarch64", target_feature = "neon"))]
743        {
744            arm_impl::element_wise_multiply_neon(a, b)
745        }
746        #[cfg(not(any(
747            all(target_arch = "x86_64", target_feature = "avx"),
748            all(target_arch = "aarch64", target_feature = "neon")
749        )))]
750        {
751            // Fallback to scalar implementation
752            a.iter().zip(b.iter()).map(|(&x, &y)| x * y).collect()
753        }
754    }
755
756    #[inline]
757    fn rms_simd(signal: &[f32]) -> f32 {
758        // Use SIMD dot product for sum of squares calculation
759        dot_product_simd(signal, signal)
760    }
761
762    #[inline]
763    fn spectral_centroid_simd(spectrum: &[f32], sample_rate: f32) -> (f32, f32) {
764        spectrum
765            .iter()
766            .enumerate()
767            .map(|(i, &magnitude)| {
768                let frequency = i as f32 * sample_rate / (2.0 * spectrum.len() as f32);
769                (frequency * magnitude, magnitude)
770            })
771            .fold((0.0, 0.0), |acc, item| (acc.0 + item.0, acc.1 + item.1))
772    }
773
774    #[inline]
775    fn vector_add_simd(a: &[f32], b: &[f32]) -> Vec<f32> {
776        // Similar pattern to element_wise_multiply but with addition
777        #[cfg(all(target_arch = "x86_64", target_feature = "avx"))]
778        {
779            use std::arch::x86_64::*;
780
781            let len = a.len();
782            let chunks = len / 8;
783            let mut result: Vec<f32> = Vec::with_capacity(len);
784
785            unsafe {
786                for i in 0..chunks {
787                    let a_vec = _mm256_loadu_ps(a.as_ptr().add(i * 8));
788                    let b_vec = _mm256_loadu_ps(b.as_ptr().add(i * 8));
789                    let add = _mm256_add_ps(a_vec, b_vec);
790
791                    let ptr = result.as_mut_ptr().add(i * 8);
792                    _mm256_storeu_ps(ptr, add);
793                }
794
795                result.set_len(chunks * 8);
796
797                // Handle remaining elements
798                for i in (chunks * 8)..len {
799                    result.push(a[i] + b[i]);
800                }
801            }
802
803            result
804        }
805        #[cfg(not(all(target_arch = "x86_64", target_feature = "avx")))]
806        {
807            a.iter().zip(b.iter()).map(|(&x, &y)| x + y).collect()
808        }
809    }
810
811    #[inline]
812    fn vector_subtract_simd(a: &[f32], b: &[f32]) -> Vec<f32> {
813        // Similar pattern to vector_add but with subtraction
814        #[cfg(all(target_arch = "x86_64", target_feature = "avx"))]
815        {
816            use std::arch::x86_64::*;
817
818            let len = a.len();
819            let chunks = len / 8;
820            let mut result: Vec<f32> = Vec::with_capacity(len);
821
822            unsafe {
823                for i in 0..chunks {
824                    let a_vec = _mm256_loadu_ps(a.as_ptr().add(i * 8));
825                    let b_vec = _mm256_loadu_ps(b.as_ptr().add(i * 8));
826                    let sub = _mm256_sub_ps(a_vec, b_vec);
827
828                    let ptr = result.as_mut_ptr().add(i * 8);
829                    _mm256_storeu_ps(ptr, sub);
830                }
831
832                result.set_len(chunks * 8);
833
834                // Handle remaining elements
835                for i in (chunks * 8)..len {
836                    result.push(a[i] - b[i]);
837                }
838            }
839
840            result
841        }
842        #[cfg(not(all(target_arch = "x86_64", target_feature = "avx")))]
843        {
844            a.iter().zip(b.iter()).map(|(&x, &y)| x - y).collect()
845        }
846    }
847}
848
849/// Performance monitoring utilities
850pub struct PerformanceMonitor {
851    timings: Arc<RwLock<HashMap<String, Vec<std::time::Duration>>>>,
852}
853
854impl PerformanceMonitor {
855    /// Create a new performance monitor
856    #[must_use]
857    pub fn new() -> Self {
858        Self {
859            timings: Arc::new(RwLock::new(HashMap::new())),
860        }
861    }
862
863    /// Time an operation and record the duration
864    pub fn time_operation<F, R>(&self, name: &str, operation: F) -> R
865    where
866        F: FnOnce() -> R,
867    {
868        let start = std::time::Instant::now();
869        let result = operation();
870        let duration = start.elapsed();
871
872        self.timings
873            .write()
874            .entry(name.to_string())
875            .or_default()
876            .push(duration);
877
878        result
879    }
880
881    /// Get the average time for a named operation
882    #[must_use]
883    pub fn get_average_time(&self, name: &str) -> Option<std::time::Duration> {
884        let timings = self.timings.read();
885        if let Some(times) = timings.get(name) {
886            if times.is_empty() {
887                None
888            } else {
889                let total: std::time::Duration = times.iter().sum();
890                Some(total / times.len() as u32)
891            }
892        } else {
893            None
894        }
895    }
896
897    /// Clear all recorded timings
898    pub fn clear(&self) {
899        self.timings.write().clear();
900    }
901}
902
903impl Default for PerformanceMonitor {
904    fn default() -> Self {
905        Self::new()
906    }
907}
908
909/// GPU acceleration framework using Candle
910pub mod gpu {
911    use super::{
912        Arc, CandleResult, Device, IndexedParallelIterator, ParallelIterator, RwLock, Tensor,
913    };
914    use crate::EvaluationError;
915
916    /// GPU accelerated operations manager
917    #[derive(Clone)]
918    pub struct GpuAccelerator {
919        device: Device,
920        memory_pool: Arc<RwLock<Vec<Tensor>>>,
921        max_memory_mb: usize,
922    }
923
924    impl GpuAccelerator {
925        /// Create new GPU accelerator with automatic device detection
926        pub fn new() -> Result<Self, EvaluationError> {
927            let device = Self::detect_best_device()?;
928            let memory_pool = Arc::new(RwLock::new(Vec::new()));
929
930            Ok(Self {
931                device,
932                memory_pool,
933                max_memory_mb: 1024, // 1GB default
934            })
935        }
936
937        /// Create GPU accelerator with specific device
938        #[must_use]
939        pub fn with_device(device: Device) -> Self {
940            Self {
941                device,
942                memory_pool: Arc::new(RwLock::new(Vec::new())),
943                max_memory_mb: 1024,
944            }
945        }
946
947        /// Detect the best available device
948        fn detect_best_device() -> Result<Device, EvaluationError> {
949            // Try CUDA first
950            if let Ok(device) = Device::cuda_if_available(0) {
951                return Ok(device);
952            }
953
954            // Try Metal on macOS
955            if let Ok(device) = Device::new_metal(0) {
956                return Ok(device);
957            }
958
959            // Fallback to CPU
960            Ok(Device::Cpu)
961        }
962
963        /// Get the current device
964        #[must_use]
965        pub fn device(&self) -> &Device {
966            &self.device
967        }
968
969        /// Check if GPU acceleration is available
970        #[must_use]
971        pub fn is_gpu_available(&self) -> bool {
972            !matches!(self.device, Device::Cpu)
973        }
974
975        /// GPU-accelerated correlation calculation
976        pub fn gpu_correlation(&self, x: &[f32], y: &[f32]) -> Result<f32, EvaluationError> {
977            if x.len() != y.len() || x.is_empty() {
978                return Ok(0.0);
979            }
980
981            let result = self.compute_correlation_tensor(x, y).map_err(|e| {
982                EvaluationError::MetricCalculationError {
983                    metric: "Correlation".to_string(),
984                    message: format!("GPU correlation failed: {e}"),
985                    source: None,
986                }
987            })?;
988
989            Ok(result)
990        }
991
992        /// GPU-accelerated FFT batch processing
993        pub fn gpu_fft_batch(
994            &self,
995            signals: &[Vec<f32>],
996        ) -> Result<Vec<Vec<f32>>, EvaluationError> {
997            if signals.is_empty() {
998                return Ok(Vec::new());
999            }
1000
1001            let result = self.compute_fft_batch_tensor(signals).map_err(|e| {
1002                EvaluationError::MetricCalculationError {
1003                    metric: "FFT".to_string(),
1004                    message: format!("GPU FFT batch failed: {e}"),
1005                    source: None,
1006                }
1007            })?;
1008
1009            Ok(result)
1010        }
1011
1012        /// GPU-accelerated spectral operations
1013        pub fn gpu_spectral_analysis(
1014            &self,
1015            signal: &[f32],
1016            sample_rate: f32,
1017        ) -> Result<SpectralFeatures, EvaluationError> {
1018            let features = self
1019                .compute_spectral_features(signal, sample_rate)
1020                .map_err(|e| EvaluationError::MetricCalculationError {
1021                    metric: "SpectralAnalysis".to_string(),
1022                    message: format!("GPU spectral analysis failed: {e}"),
1023                    source: None,
1024                })?;
1025
1026            Ok(features)
1027        }
1028
1029        /// GPU-accelerated MCD calculation
1030        pub fn gpu_mcd(
1031            &self,
1032            x_mfcc: &[Vec<f32>],
1033            y_mfcc: &[Vec<f32>],
1034        ) -> Result<f32, EvaluationError> {
1035            let mcd = self.compute_mcd_tensor(x_mfcc, y_mfcc).map_err(|e| {
1036                EvaluationError::MetricCalculationError {
1037                    metric: "MCD".to_string(),
1038                    message: format!("GPU MCD calculation failed: {e}"),
1039                    source: None,
1040                }
1041            })?;
1042
1043            Ok(mcd)
1044        }
1045
1046        /// GPU-accelerated autocorrelation
1047        pub fn gpu_autocorrelation(
1048            &self,
1049            signal: &[f32],
1050            max_lag: usize,
1051        ) -> Result<Vec<f32>, EvaluationError> {
1052            let result = self
1053                .compute_autocorrelation_tensor(signal, max_lag)
1054                .map_err(|e| EvaluationError::MetricCalculationError {
1055                    metric: "Autocorrelation".to_string(),
1056                    message: format!("GPU autocorrelation failed: {e}"),
1057                    source: None,
1058                })?;
1059
1060            Ok(result)
1061        }
1062
1063        // Internal tensor operations
1064
1065        fn compute_correlation_tensor(&self, x: &[f32], y: &[f32]) -> CandleResult<f32> {
1066            let x_tensor = Tensor::from_slice(x, x.len(), &self.device)?;
1067            let y_tensor = Tensor::from_slice(y, y.len(), &self.device)?;
1068
1069            // Calculate means as scalars and get the values
1070            let x_mean_val = x_tensor.mean_all()?.to_scalar::<f32>()?;
1071            let y_mean_val = y_tensor.mean_all()?.to_scalar::<f32>()?;
1072
1073            // Center the data by subtracting scalar values
1074            let x_centered = x_tensor
1075                .to_vec1::<f32>()?
1076                .iter()
1077                .map(|&v| v - x_mean_val)
1078                .collect::<Vec<_>>();
1079            let y_centered = y_tensor
1080                .to_vec1::<f32>()?
1081                .iter()
1082                .map(|&v| v - y_mean_val)
1083                .collect::<Vec<_>>();
1084
1085            let x_centered_tensor =
1086                Tensor::from_slice(&x_centered, x_centered.len(), &self.device)?;
1087            let y_centered_tensor =
1088                Tensor::from_slice(&y_centered, y_centered.len(), &self.device)?;
1089
1090            // Calculate numerator and denominators
1091            let numerator = x_centered_tensor.mul(&y_centered_tensor)?.sum_all()?;
1092            let x_sq_sum = x_centered_tensor.mul(&x_centered_tensor)?.sum_all()?;
1093            let y_sq_sum = y_centered_tensor.mul(&y_centered_tensor)?.sum_all()?;
1094
1095            let denominator = x_sq_sum.mul(&y_sq_sum)?.sqrt()?;
1096
1097            let correlation = if denominator.to_scalar::<f32>()? > f32::EPSILON {
1098                numerator.div(&denominator)?.to_scalar::<f32>()?
1099            } else {
1100                0.0
1101            };
1102
1103            Ok(correlation)
1104        }
1105
1106        fn compute_fft_batch_tensor(&self, signals: &[Vec<f32>]) -> CandleResult<Vec<Vec<f32>>> {
1107            let mut results = Vec::new();
1108
1109            for signal in signals {
1110                if signal.is_empty() {
1111                    results.push(Vec::new());
1112                    continue;
1113                }
1114
1115                let signal_tensor = Tensor::from_slice(signal, signal.len(), &self.device)?;
1116
1117                // Simple magnitude spectrum approximation using convolution
1118                // In a real implementation, you'd use proper FFT operations
1119                let spectrum = self.compute_magnitude_spectrum(&signal_tensor)?;
1120                results.push(spectrum);
1121            }
1122
1123            Ok(results)
1124        }
1125
1126        fn compute_magnitude_spectrum(&self, signal: &Tensor) -> CandleResult<Vec<f32>> {
1127            // Simplified spectrum computation - in practice you'd use proper FFT
1128            let signal_data = signal.to_vec1::<f32>()?;
1129            let spectrum_size = signal_data.len() / 2 + 1;
1130            let mut spectrum = vec![0.0; spectrum_size];
1131
1132            for (i, value) in spectrum.iter_mut().enumerate() {
1133                let frequency_ratio = i as f32 / spectrum_size as f32;
1134                *value = (1.0 - frequency_ratio) * signal_data.iter().map(|x| x.abs()).sum::<f32>()
1135                    / signal_data.len() as f32;
1136            }
1137
1138            Ok(spectrum)
1139        }
1140
1141        fn compute_spectral_features(
1142            &self,
1143            signal: &[f32],
1144            sample_rate: f32,
1145        ) -> CandleResult<SpectralFeatures> {
1146            let signal_tensor = Tensor::from_slice(signal, signal.len(), &self.device)?;
1147
1148            // Compute basic spectral features using GPU
1149            let magnitude_spectrum = self.compute_magnitude_spectrum(&signal_tensor)?;
1150
1151            // Calculate spectral centroid
1152            let mut weighted_sum = 0.0;
1153            let mut total_energy = 0.0;
1154
1155            for (i, &energy) in magnitude_spectrum.iter().enumerate() {
1156                let frequency = i as f32 * sample_rate / (2.0 * magnitude_spectrum.len() as f32);
1157                weighted_sum += frequency * energy;
1158                total_energy += energy;
1159            }
1160
1161            let centroid = if total_energy > 0.0 {
1162                weighted_sum / total_energy
1163            } else {
1164                0.0
1165            };
1166
1167            // Calculate spectral rolloff (frequency below which 85% of energy is contained)
1168            let mut cumulative_energy = 0.0;
1169            let target_energy = total_energy * 0.85;
1170            let mut rolloff = sample_rate / 2.0;
1171
1172            for (i, &energy) in magnitude_spectrum.iter().enumerate() {
1173                cumulative_energy += energy;
1174                if cumulative_energy >= target_energy {
1175                    rolloff = i as f32 * sample_rate / (2.0 * magnitude_spectrum.len() as f32);
1176                    break;
1177                }
1178            }
1179
1180            // Calculate spectral spread
1181            let mut spread_sum = 0.0;
1182            for (i, &energy) in magnitude_spectrum.iter().enumerate() {
1183                let frequency = i as f32 * sample_rate / (2.0 * magnitude_spectrum.len() as f32);
1184                spread_sum += (frequency - centroid).powi(2) * energy;
1185            }
1186
1187            let spread = if total_energy > 0.0 {
1188                (spread_sum / total_energy).sqrt()
1189            } else {
1190                0.0
1191            };
1192
1193            Ok(SpectralFeatures {
1194                centroid,
1195                spread,
1196                rolloff,
1197                flux: 0.0, // Would need previous frame for flux calculation
1198                energy: total_energy,
1199            })
1200        }
1201
1202        fn compute_mcd_tensor(
1203            &self,
1204            x_mfcc: &[Vec<f32>],
1205            y_mfcc: &[Vec<f32>],
1206        ) -> CandleResult<f32> {
1207            if x_mfcc.is_empty() || y_mfcc.is_empty() {
1208                return Ok(f32::INFINITY);
1209            }
1210
1211            let min_frames = x_mfcc.len().min(y_mfcc.len());
1212            let mut total_distance = 0.0;
1213            let mut valid_frames = 0;
1214
1215            for i in 0..min_frames {
1216                if x_mfcc[i].len() == y_mfcc[i].len() && !x_mfcc[i].is_empty() {
1217                    let x_tensor = Tensor::from_slice(&x_mfcc[i], x_mfcc[i].len(), &self.device)?;
1218                    let y_tensor = Tensor::from_slice(&y_mfcc[i], y_mfcc[i].len(), &self.device)?;
1219
1220                    // Skip c0 (energy coefficient) by taking slice from index 1
1221                    let x_ceps = x_tensor.narrow(0, 1, x_mfcc[i].len() - 1)?;
1222                    let y_ceps = y_tensor.narrow(0, 1, y_mfcc[i].len() - 1)?;
1223
1224                    let diff = x_ceps.sub(&y_ceps)?;
1225                    let squared_diff = diff.mul(&diff)?;
1226                    let sum_squared = squared_diff.sum_all()?;
1227
1228                    let distance = sum_squared.sqrt()?.to_scalar::<f32>()?;
1229                    total_distance += distance;
1230                    valid_frames += 1;
1231                }
1232            }
1233
1234            if valid_frames > 0 {
1235                let mcd = (10.0 / std::f32::consts::LN_10) * (total_distance / valid_frames as f32);
1236                Ok(mcd)
1237            } else {
1238                Ok(f32::INFINITY)
1239            }
1240        }
1241
1242        fn compute_autocorrelation_tensor(
1243            &self,
1244            signal: &[f32],
1245            max_lag: usize,
1246        ) -> CandleResult<Vec<f32>> {
1247            if signal.len() < 2 || max_lag == 0 {
1248                return Ok(vec![0.0; max_lag + 1]);
1249            }
1250
1251            let signal_tensor = Tensor::from_slice(signal, signal.len(), &self.device)?;
1252            let effective_max_lag = max_lag.min(signal.len() - 1);
1253            let mut autocorr = vec![0.0; effective_max_lag + 1];
1254
1255            for lag in 0..=effective_max_lag {
1256                if lag < signal.len() {
1257                    let signal1 = signal_tensor.narrow(0, 0, signal.len() - lag)?;
1258                    let signal2 = signal_tensor.narrow(0, lag, signal.len() - lag)?;
1259
1260                    let correlation = signal1.mul(&signal2)?.sum_all()?;
1261                    let norm1 = signal1.mul(&signal1)?.sum_all()?.sqrt()?;
1262                    let norm2 = signal2.mul(&signal2)?.sum_all()?.sqrt()?;
1263
1264                    let denominator = norm1.mul(&norm2)?.to_scalar::<f32>()?;
1265                    if denominator > 0.0 {
1266                        autocorr[lag] = correlation.to_scalar::<f32>()? / denominator;
1267                    }
1268                }
1269            }
1270
1271            Ok(autocorr)
1272        }
1273
1274        /// Clear memory pool to free GPU memory
1275        pub fn clear_memory_pool(&self) {
1276            self.memory_pool.write().clear();
1277        }
1278
1279        /// Set maximum memory usage in MB
1280        pub fn set_max_memory(&mut self, max_memory_mb: usize) {
1281            self.max_memory_mb = max_memory_mb;
1282        }
1283    }
1284
1285    impl Default for GpuAccelerator {
1286        fn default() -> Self {
1287            Self::new().unwrap_or_else(|_| Self::with_device(Device::Cpu))
1288        }
1289    }
1290
1291    /// Spectral features computed on GPU
1292    #[derive(Debug, Clone)]
1293    pub struct SpectralFeatures {
1294        /// Spectral centroid (center of mass of the spectrum)
1295        pub centroid: f32,
1296        /// Spectral spread (variance around the centroid)
1297        pub spread: f32,
1298        /// Spectral rolloff frequency (95% of spectral energy below this frequency)
1299        pub rolloff: f32,
1300        /// Spectral flux (rate of change of the power spectrum)
1301        pub flux: f32,
1302        /// Total spectral energy
1303        pub energy: f32,
1304    }
1305
1306    /// GPU memory manager for efficient tensor operations
1307    pub struct GpuMemoryManager {
1308        device: Device,
1309        allocated_tensors: Arc<RwLock<Vec<Tensor>>>,
1310        max_cache_size: usize,
1311    }
1312
1313    impl GpuMemoryManager {
1314        /// Create a new GPU memory manager
1315        ///
1316        /// # Arguments
1317        /// * `device` - The GPU device to use for tensor operations
1318        /// * `max_cache_size` - Maximum number of tensors to cache in memory
1319        #[must_use]
1320        pub fn new(device: Device, max_cache_size: usize) -> Self {
1321            Self {
1322                device,
1323                allocated_tensors: Arc::new(RwLock::new(Vec::new())),
1324                max_cache_size,
1325            }
1326        }
1327
1328        /// Allocate a tensor on the GPU with caching
1329        ///
1330        /// # Arguments
1331        /// * `shape` - Shape of the tensor to allocate
1332        ///
1333        /// # Returns
1334        /// A tensor allocated on the GPU device
1335        pub fn allocate_tensor(&self, shape: &[usize]) -> CandleResult<Tensor> {
1336            let tensor = Tensor::zeros(shape, candle_core::DType::F32, &self.device)?;
1337
1338            let mut cache = self.allocated_tensors.write();
1339            if cache.len() < self.max_cache_size {
1340                cache.push(tensor.clone());
1341            }
1342
1343            Ok(tensor)
1344        }
1345
1346        /// Clear the tensor cache to free GPU memory
1347        pub fn clear_cache(&self) {
1348            self.allocated_tensors.write().clear();
1349        }
1350    }
1351}
1352
1353/// Multi-GPU scaling capabilities for distributed computation
1354pub mod multi_gpu {
1355    use super::gpu::{GpuAccelerator, SpectralFeatures};
1356    use super::*;
1357    use std::sync::atomic::{AtomicUsize, Ordering};
1358    use tokio::sync::Semaphore;
1359
1360    /// Multi-GPU manager for distributed computation across multiple devices
1361    pub struct MultiGpuManager {
1362        accelerators: Vec<GpuAccelerator>,
1363        load_balancer: AtomicUsize,
1364        semaphore: Arc<Semaphore>,
1365        max_concurrent_ops: usize,
1366    }
1367
1368    impl MultiGpuManager {
1369        /// Create a new multi-GPU manager with automatic device detection
1370        pub fn new(max_concurrent_ops: usize) -> Result<Self, crate::EvaluationError> {
1371            let accelerators = Self::detect_all_devices()?;
1372            let semaphore = Arc::new(Semaphore::new(max_concurrent_ops));
1373
1374            Ok(Self {
1375                accelerators,
1376                load_balancer: AtomicUsize::new(0),
1377                semaphore,
1378                max_concurrent_ops,
1379            })
1380        }
1381
1382        /// Create a multi-GPU manager with specific devices
1383        pub fn with_devices(devices: Vec<Device>, max_concurrent_ops: usize) -> Self {
1384            let accelerators = devices
1385                .into_iter()
1386                .map(GpuAccelerator::with_device)
1387                .collect();
1388            let semaphore = Arc::new(Semaphore::new(max_concurrent_ops));
1389
1390            Self {
1391                accelerators,
1392                load_balancer: AtomicUsize::new(0),
1393                semaphore,
1394                max_concurrent_ops,
1395            }
1396        }
1397
1398        /// Get the number of available GPU devices
1399        #[must_use]
1400        pub fn device_count(&self) -> usize {
1401            self.accelerators.len()
1402        }
1403
1404        /// Check if any GPU acceleration is available
1405        #[must_use]
1406        pub fn has_gpu_acceleration(&self) -> bool {
1407            self.accelerators.iter().any(|acc| acc.is_gpu_available())
1408        }
1409
1410        /// Get device information for all managed devices
1411        pub fn device_info(&self) -> Vec<String> {
1412            self.accelerators
1413                .iter()
1414                .enumerate()
1415                .map(|(idx, acc)| {
1416                    format!(
1417                        "Device {}: {} (GPU: {})",
1418                        idx,
1419                        match acc.device() {
1420                            Device::Cpu => "CPU",
1421                            Device::Cuda(_) => "CUDA",
1422                            Device::Metal(_) => "Metal",
1423                        },
1424                        acc.is_gpu_available()
1425                    )
1426                })
1427                .collect()
1428        }
1429
1430        /// Distribute correlation calculations across multiple GPUs
1431        pub async fn distributed_correlation_batch(
1432            &self,
1433            data_pairs: &[(Vec<f32>, Vec<f32>)],
1434        ) -> Result<Vec<f32>, crate::EvaluationError> {
1435            if data_pairs.is_empty() {
1436                return Ok(Vec::new());
1437            }
1438
1439            // Split work across devices
1440            let chunk_size =
1441                (data_pairs.len() + self.accelerators.len() - 1) / self.accelerators.len();
1442            let chunks: Vec<_> = data_pairs.chunks(chunk_size).collect();
1443
1444            let mut handles = Vec::new();
1445
1446            for (device_idx, chunk) in chunks.into_iter().enumerate() {
1447                let device_idx = device_idx % self.accelerators.len();
1448                let accelerator = &self.accelerators[device_idx];
1449                let chunk_data = chunk.to_vec();
1450
1451                let permit = Arc::clone(&self.semaphore)
1452                    .acquire_owned()
1453                    .await
1454                    .map_err(|_| crate::EvaluationError::MetricCalculationError {
1455                        metric: "MultiGPU".to_string(),
1456                        message: "Failed to acquire semaphore permit".to_string(),
1457                        source: None,
1458                    })?;
1459
1460                let accelerator_clone = accelerator.clone();
1461                let handle = tokio::spawn(async move {
1462                    let _permit = permit;
1463                    let mut results = Vec::new();
1464
1465                    for (x, y) in chunk_data {
1466                        match accelerator_clone.gpu_correlation(&x, &y) {
1467                            Ok(corr) => results.push(corr),
1468                            Err(e) => return Err(e),
1469                        }
1470                    }
1471
1472                    Ok(results)
1473                });
1474
1475                handles.push(handle);
1476            }
1477
1478            // Collect results from all devices
1479            let mut all_results = Vec::new();
1480            for handle in handles {
1481                let chunk_results = handle.await.map_err(|e| {
1482                    crate::EvaluationError::MetricCalculationError {
1483                        metric: "MultiGPU".to_string(),
1484                        message: format!("GPU task failed: {e}"),
1485                        source: None,
1486                    }
1487                })??;
1488                all_results.extend(chunk_results);
1489            }
1490
1491            Ok(all_results)
1492        }
1493
1494        /// Distribute FFT operations across multiple GPUs
1495        pub async fn distributed_fft_batch(
1496            &self,
1497            signals: &[Vec<f32>],
1498        ) -> Result<Vec<Vec<f32>>, crate::EvaluationError> {
1499            if signals.is_empty() {
1500                return Ok(Vec::new());
1501            }
1502
1503            let chunk_size =
1504                (signals.len() + self.accelerators.len() - 1) / self.accelerators.len();
1505            let chunks: Vec<_> = signals.chunks(chunk_size).collect();
1506
1507            let mut handles = Vec::new();
1508
1509            for (device_idx, chunk) in chunks.into_iter().enumerate() {
1510                let device_idx = device_idx % self.accelerators.len();
1511                let accelerator = &self.accelerators[device_idx];
1512                let chunk_data = chunk.to_vec();
1513
1514                let permit = Arc::clone(&self.semaphore)
1515                    .acquire_owned()
1516                    .await
1517                    .map_err(|_| crate::EvaluationError::MetricCalculationError {
1518                        metric: "MultiGPU".to_string(),
1519                        message: "Failed to acquire semaphore permit".to_string(),
1520                        source: None,
1521                    })?;
1522
1523                let accelerator_clone = accelerator.clone();
1524                let handle = tokio::spawn(async move {
1525                    let _permit = permit;
1526                    accelerator_clone.gpu_fft_batch(&chunk_data)
1527                });
1528
1529                handles.push(handle);
1530            }
1531
1532            // Collect results maintaining order
1533            let mut all_results = Vec::new();
1534            for handle in handles {
1535                let chunk_results = handle.await.map_err(|e| {
1536                    crate::EvaluationError::MetricCalculationError {
1537                        metric: "MultiGPU".to_string(),
1538                        message: format!("GPU FFT task failed: {e}"),
1539                        source: None,
1540                    }
1541                })??;
1542                all_results.extend(chunk_results);
1543            }
1544
1545            Ok(all_results)
1546        }
1547
1548        /// Distribute spectral analysis across multiple GPUs
1549        pub async fn distributed_spectral_analysis(
1550            &self,
1551            signals: &[Vec<f32>],
1552            sample_rate: f32,
1553        ) -> Result<Vec<SpectralFeatures>, crate::EvaluationError> {
1554            if signals.is_empty() {
1555                return Ok(Vec::new());
1556            }
1557
1558            let chunk_size =
1559                (signals.len() + self.accelerators.len() - 1) / self.accelerators.len();
1560            let chunks: Vec<_> = signals.chunks(chunk_size).collect();
1561
1562            let mut handles = Vec::new();
1563
1564            for (device_idx, chunk) in chunks.into_iter().enumerate() {
1565                let device_idx = device_idx % self.accelerators.len();
1566                let accelerator = &self.accelerators[device_idx];
1567                let chunk_data = chunk.to_vec();
1568
1569                let permit = Arc::clone(&self.semaphore)
1570                    .acquire_owned()
1571                    .await
1572                    .map_err(|_| crate::EvaluationError::MetricCalculationError {
1573                        metric: "MultiGPU".to_string(),
1574                        message: "Failed to acquire semaphore permit".to_string(),
1575                        source: None,
1576                    })?;
1577
1578                let accelerator_clone = accelerator.clone();
1579                let handle = tokio::spawn(async move {
1580                    let _permit = permit;
1581                    let mut results = Vec::new();
1582
1583                    for signal in chunk_data {
1584                        match accelerator_clone.gpu_spectral_analysis(&signal, sample_rate) {
1585                            Ok(features) => results.push(features),
1586                            Err(e) => return Err(e),
1587                        }
1588                    }
1589
1590                    Ok(results)
1591                });
1592
1593                handles.push(handle);
1594            }
1595
1596            // Collect results
1597            let mut all_results = Vec::new();
1598            for handle in handles {
1599                let chunk_results = handle.await.map_err(|e| {
1600                    crate::EvaluationError::MetricCalculationError {
1601                        metric: "MultiGPU".to_string(),
1602                        message: format!("GPU spectral analysis task failed: {e}"),
1603                        source: None,
1604                    }
1605                })??;
1606                all_results.extend(chunk_results);
1607            }
1608
1609            Ok(all_results)
1610        }
1611
1612        /// Get next device using round-robin load balancing
1613        pub fn get_next_device(&self) -> &GpuAccelerator {
1614            let index =
1615                self.load_balancer.fetch_add(1, Ordering::Relaxed) % self.accelerators.len();
1616            &self.accelerators[index]
1617        }
1618
1619        /// Clear memory pools on all devices
1620        pub fn clear_all_memory_pools(&self) {
1621            for accelerator in &self.accelerators {
1622                accelerator.clear_memory_pool();
1623            }
1624        }
1625
1626        /// Set maximum memory usage for all devices
1627        pub fn set_max_memory_all(&mut self, max_memory_mb: usize) {
1628            for accelerator in &mut self.accelerators {
1629                accelerator.set_max_memory(max_memory_mb);
1630            }
1631        }
1632
1633        // Private helper methods
1634
1635        fn detect_all_devices() -> Result<Vec<GpuAccelerator>, crate::EvaluationError> {
1636            let mut accelerators = Vec::new();
1637
1638            // Try to detect CUDA devices
1639            for device_id in 0..8 {
1640                if let Ok(device) = Device::cuda_if_available(device_id) {
1641                    accelerators.push(GpuAccelerator::with_device(device));
1642                } else {
1643                    break;
1644                }
1645            }
1646
1647            // Try to detect Metal devices
1648            for device_id in 0..4 {
1649                if let Ok(device) = Device::new_metal(device_id) {
1650                    accelerators.push(GpuAccelerator::with_device(device));
1651                } else {
1652                    break;
1653                }
1654            }
1655
1656            // Always include CPU as fallback
1657            if accelerators.is_empty() {
1658                accelerators.push(GpuAccelerator::with_device(Device::Cpu));
1659            }
1660
1661            Ok(accelerators)
1662        }
1663    }
1664
1665    impl Clone for MultiGpuManager {
1666        fn clone(&self) -> Self {
1667            Self {
1668                accelerators: self.accelerators.clone(),
1669                load_balancer: AtomicUsize::new(self.load_balancer.load(Ordering::Relaxed)),
1670                semaphore: Arc::new(Semaphore::new(self.max_concurrent_ops)),
1671                max_concurrent_ops: self.max_concurrent_ops,
1672            }
1673        }
1674    }
1675
1676    /// Performance metrics for multi-GPU operations
1677    #[derive(Debug, Clone)]
1678    pub struct MultiGpuMetrics {
1679        /// Total number of operations performed
1680        pub total_operations: usize,
1681        /// Operations per device
1682        pub operations_per_device: Vec<usize>,
1683        /// Average operation time per device
1684        pub avg_time_per_device: Vec<std::time::Duration>,
1685        /// Memory usage per device
1686        pub memory_usage_per_device: Vec<u64>,
1687        /// Overall throughput (operations per second)
1688        pub throughput: f64,
1689    }
1690
1691    impl MultiGpuMetrics {
1692        /// Create new metrics instance
1693        #[must_use]
1694        pub fn new(device_count: usize) -> Self {
1695            Self {
1696                total_operations: 0,
1697                operations_per_device: vec![0; device_count],
1698                avg_time_per_device: vec![std::time::Duration::ZERO; device_count],
1699                memory_usage_per_device: vec![0; device_count],
1700                throughput: 0.0,
1701            }
1702        }
1703
1704        /// Calculate load balance efficiency (0.0 = perfectly unbalanced, 1.0 = perfectly balanced)
1705        #[must_use]
1706        pub fn load_balance_efficiency(&self) -> f32 {
1707            if self.operations_per_device.is_empty() {
1708                return 1.0;
1709            }
1710
1711            let avg_ops = self.operations_per_device.iter().sum::<usize>() as f32
1712                / self.operations_per_device.len() as f32;
1713
1714            if avg_ops == 0.0 {
1715                return 1.0;
1716            }
1717
1718            let variance = self
1719                .operations_per_device
1720                .iter()
1721                .map(|&ops| {
1722                    let diff = ops as f32 - avg_ops;
1723                    diff * diff
1724                })
1725                .sum::<f32>()
1726                / self.operations_per_device.len() as f32;
1727
1728            let coefficient_of_variation = variance.sqrt() / avg_ops;
1729            (1.0 - coefficient_of_variation).max(0.0)
1730        }
1731
1732        /// Get device with highest throughput
1733        #[must_use]
1734        pub fn fastest_device(&self) -> Option<usize> {
1735            self.avg_time_per_device
1736                .iter()
1737                .enumerate()
1738                .filter(|(_, &time)| time > std::time::Duration::ZERO)
1739                .min_by_key(|(_, &time)| time)
1740                .map(|(idx, _)| idx)
1741        }
1742    }
1743}
1744
1745#[cfg(test)]
1746mod tests {
1747    use super::*;
1748
1749    #[test]
1750    fn test_parallel_correlation() {
1751        let x = vec![1.0, 2.0, 3.0, 4.0, 5.0];
1752        let y = vec![1.0, 2.0, 3.0, 4.0, 5.0];
1753
1754        let corr = parallel_correlation(&x, &y);
1755        assert!((corr - 1.0).abs() < 0.001);
1756    }
1757
1758    #[test]
1759    fn test_lru_cache() {
1760        let cache = LRUCache::new(2);
1761
1762        cache.insert("key1", "value1");
1763        cache.insert("key2", "value2");
1764
1765        assert_eq!(cache.get(&"key1"), Some("value1"));
1766        assert_eq!(cache.get(&"key2"), Some("value2"));
1767
1768        // Should trigger eviction
1769        cache.insert("key3", "value3");
1770        assert_eq!(cache.len(), 1);
1771    }
1772
1773    #[test]
1774    fn test_sliding_window_processor() {
1775        let processor = SlidingWindowProcessor::new(3, 1);
1776        let data = vec![1, 2, 3, 4, 5];
1777
1778        let results = processor.process_parallel(&data, |window| window.iter().sum::<i32>());
1779
1780        assert_eq!(results, vec![6, 9, 12]); // [1,2,3], [2,3,4], [3,4,5]
1781    }
1782
1783    #[test]
1784    fn test_simd_operations() {
1785        let a = vec![1.0, 2.0, 3.0, 4.0];
1786        let b = vec![2.0, 3.0, 4.0, 5.0];
1787
1788        let dot = simd::dot_product(&a, &b);
1789        assert_eq!(dot, 40.0); // 1*2 + 2*3 + 3*4 + 4*5
1790
1791        let product = simd::element_wise_multiply(&a, &b);
1792        assert_eq!(product, vec![2.0, 6.0, 12.0, 20.0]);
1793
1794        let rms = simd::rms(&a);
1795        assert!((rms - 2.738_613).abs() < 0.001);
1796    }
1797
1798    #[test]
1799    fn test_simd_vector_add() {
1800        let a = vec![1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0];
1801        let b = vec![1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0];
1802
1803        let result = simd::vector_add(&a, &b);
1804        assert_eq!(result, vec![2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]);
1805
1806        // Test empty vectors
1807        let empty_result = simd::vector_add(&[], &[]);
1808        assert_eq!(empty_result, Vec::<f32>::new());
1809
1810        // Test mismatched lengths
1811        let mismatch_result = simd::vector_add(&a, &[1.0]);
1812        assert_eq!(mismatch_result, Vec::<f32>::new());
1813    }
1814
1815    #[test]
1816    fn test_simd_vector_subtract() {
1817        let a = vec![5.0, 6.0, 7.0, 8.0, 9.0, 10.0, 11.0, 12.0];
1818        let b = vec![1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0];
1819
1820        let result = simd::vector_subtract(&a, &b);
1821        assert_eq!(result, vec![4.0, 4.0, 4.0, 4.0, 4.0, 4.0, 4.0, 4.0]);
1822
1823        // Test empty vectors
1824        let empty_result = simd::vector_subtract(&[], &[]);
1825        assert_eq!(empty_result, Vec::<f32>::new());
1826
1827        // Test mismatched lengths
1828        let mismatch_result = simd::vector_subtract(&a, &[1.0]);
1829        assert_eq!(mismatch_result, Vec::<f32>::new());
1830    }
1831
1832    #[test]
1833    fn test_simd_large_vectors() {
1834        // Test with vectors larger than SIMD thresholds to ensure parallel fallback
1835        let size = 2000;
1836        let a: Vec<f32> = (0..size).map(|i| i as f32).collect();
1837        let b: Vec<f32> = (0..size).map(|i| (i + 1) as f32).collect();
1838
1839        let dot = simd::dot_product(&a, &b);
1840        let expected_dot: f32 = a.iter().zip(&b).map(|(x, y)| x * y).sum();
1841        // Use relative tolerance for large numbers
1842        let tolerance = (expected_dot * 1e-5).max(1.0);
1843        assert!((dot - expected_dot).abs() < tolerance);
1844
1845        let product = simd::element_wise_multiply(&a, &b);
1846        let expected_product: Vec<f32> = a.iter().zip(&b).map(|(x, y)| x * y).collect();
1847        assert_eq!(product.len(), expected_product.len());
1848        for (actual, expected) in product.iter().zip(&expected_product) {
1849            let tolerance = (expected.abs() * 1e-5).max(1e-3);
1850            assert!((actual - expected).abs() < tolerance);
1851        }
1852    }
1853
1854    #[test]
1855    fn test_simd_spectral_centroid() {
1856        let spectrum = vec![0.0, 1.0, 2.0, 1.0, 0.0];
1857        let sample_rate = 16000.0;
1858
1859        let centroid = simd::spectral_centroid(&spectrum, sample_rate);
1860
1861        // Manual calculation for verification
1862        let frequencies: Vec<f32> = (0..spectrum.len())
1863            .map(|i| i as f32 * sample_rate / (2.0 * spectrum.len() as f32))
1864            .collect();
1865
1866        let weighted_sum: f32 = spectrum
1867            .iter()
1868            .zip(&frequencies)
1869            .map(|(mag, freq)| mag * freq)
1870            .sum();
1871        let magnitude_sum: f32 = spectrum.iter().sum();
1872        let expected = if magnitude_sum > 0.0 {
1873            weighted_sum / magnitude_sum
1874        } else {
1875            0.0
1876        };
1877
1878        assert!((centroid - expected).abs() < 0.001);
1879    }
1880
1881    #[test]
1882    fn test_simd_edge_cases() {
1883        // Test with zero vectors
1884        let zeros = vec![0.0; 8];
1885        let ones = vec![1.0; 8];
1886
1887        assert_eq!(simd::dot_product(&zeros, &ones), 0.0);
1888        assert_eq!(simd::rms(&zeros), 0.0);
1889        assert_eq!(simd::spectral_centroid(&zeros, 16000.0), 0.0);
1890
1891        // Test with single element
1892        let single_a = vec![5.0];
1893        let single_b = vec![3.0];
1894
1895        assert_eq!(simd::dot_product(&single_a, &single_b), 15.0);
1896        assert_eq!(simd::vector_add(&single_a, &single_b), vec![8.0]);
1897        assert_eq!(simd::vector_subtract(&single_a, &single_b), vec![2.0]);
1898    }
1899
1900    #[test]
1901    fn test_simd_numerical_precision() {
1902        // Test with very small numbers to ensure precision
1903        let a = vec![1e-6, 2e-6, 3e-6, 4e-6];
1904        let b = vec![1e-6, 2e-6, 3e-6, 4e-6];
1905
1906        let dot = simd::dot_product(&a, &b);
1907        let expected = 1e-12 + 4e-12 + 9e-12 + 16e-12; // 30e-12
1908        assert!((dot - expected).abs() < 1e-15);
1909
1910        // Test with very large numbers
1911        let large_a = vec![1e6, 2e6, 3e6, 4e6];
1912        let large_b = vec![1e6, 2e6, 3e6, 4e6];
1913
1914        let large_dot = simd::dot_product(&large_a, &large_b);
1915        let large_expected = 1e12 + 4e12 + 9e12 + 16e12; // 30e12
1916        assert!((large_dot - large_expected).abs() < 1e9);
1917    }
1918
1919    #[test]
1920    fn test_simd_performance_consistency() {
1921        // Verify that SIMD and scalar implementations produce the same results
1922        let size = 100;
1923        let a: Vec<f32> = (0..size).map(|i| (i as f32 * 0.1).sin()).collect();
1924        let b: Vec<f32> = (0..size).map(|i| (i as f32 * 0.1).cos()).collect();
1925
1926        let simd_dot = simd::dot_product(&a, &b);
1927        let scalar_dot: f32 = a.iter().zip(&b).map(|(x, y)| x * y).sum();
1928
1929        // Results should be very close (allowing for minor floating point differences)
1930        assert!((simd_dot - scalar_dot).abs() < 1e-5);
1931
1932        let simd_product = simd::element_wise_multiply(&a, &b);
1933        let scalar_product: Vec<f32> = a.iter().zip(&b).map(|(x, y)| x * y).collect();
1934
1935        assert_eq!(simd_product.len(), scalar_product.len());
1936        for (simd_val, scalar_val) in simd_product.iter().zip(&scalar_product) {
1937            assert!((simd_val - scalar_val).abs() < 1e-6);
1938        }
1939    }
1940
1941    #[test]
1942    fn test_performance_monitor() {
1943        let monitor = PerformanceMonitor::new();
1944
1945        let result = monitor.time_operation("test_op", || {
1946            std::thread::sleep(std::time::Duration::from_millis(10));
1947            42
1948        });
1949
1950        assert_eq!(result, 42);
1951        let avg_time = monitor.get_average_time("test_op").unwrap();
1952        assert!(avg_time >= std::time::Duration::from_millis(10));
1953    }
1954
1955    #[test]
1956    fn test_gpu_accelerator_creation() {
1957        let accelerator = gpu::GpuAccelerator::default();
1958
1959        // Should always succeed with fallback to CPU
1960        assert!(matches!(accelerator.device(), Device::Cpu) || accelerator.is_gpu_available());
1961    }
1962
1963    #[test]
1964    fn test_gpu_correlation() {
1965        let accelerator = gpu::GpuAccelerator::default();
1966        let x = vec![1.0, 2.0, 3.0, 4.0, 5.0];
1967        let y = vec![1.0, 2.0, 3.0, 4.0, 5.0];
1968
1969        let correlation = accelerator.gpu_correlation(&x, &y).unwrap();
1970        assert!((correlation - 1.0).abs() < 0.001);
1971    }
1972
1973    #[test]
1974    fn test_gpu_autocorrelation() {
1975        let accelerator = gpu::GpuAccelerator::default();
1976        let signal = vec![1.0, 0.8, 0.6, 0.4, 0.2];
1977
1978        let autocorr = accelerator.gpu_autocorrelation(&signal, 3).unwrap();
1979        assert_eq!(autocorr.len(), 4); // 0 to 3 lags inclusive
1980        assert!((autocorr[0] - 1.0).abs() < 0.001); // Perfect correlation at lag 0
1981    }
1982
1983    #[test]
1984    fn test_gpu_spectral_analysis() {
1985        let accelerator = gpu::GpuAccelerator::default();
1986        let signal = vec![0.1; 1024];
1987        let sample_rate = 16000.0;
1988
1989        let features = accelerator
1990            .gpu_spectral_analysis(&signal, sample_rate)
1991            .unwrap();
1992        assert!(features.centroid >= 0.0);
1993        assert!(features.energy >= 0.0);
1994        assert!(features.rolloff >= 0.0);
1995        assert!(features.spread >= 0.0);
1996    }
1997
1998    #[test]
1999    fn test_gpu_memory_manager() {
2000        let device = Device::Cpu; // Use CPU for testing
2001        let manager = gpu::GpuMemoryManager::new(device, 10);
2002
2003        let tensor = manager.allocate_tensor(&[10, 10]).unwrap();
2004        assert_eq!(tensor.shape().dims(), &[10, 10]);
2005
2006        manager.clear_cache();
2007    }
2008
2009    #[test]
2010    fn test_persistent_cache() {
2011        use tempfile::TempDir;
2012
2013        let temp_dir = TempDir::new().unwrap();
2014        let cache_dir = temp_dir.path().join("test_cache");
2015
2016        let cache = PersistentCache::new(&cache_dir, 10, 6).unwrap();
2017
2018        // Test insertion and retrieval
2019        assert!(cache
2020            .insert("key1".to_string(), "value1".to_string())
2021            .is_ok());
2022        assert_eq!(cache.get(&"key1".to_string()), Some("value1".to_string()));
2023
2024        // Test disk persistence
2025        let cache2 = PersistentCache::new(&cache_dir, 10, 6).unwrap();
2026        assert_eq!(cache2.get(&"key1".to_string()), Some("value1".to_string()));
2027
2028        // Test statistics
2029        let stats = cache.stats();
2030        assert!(stats.disk_entries > 0);
2031        assert!(stats.cache_dir_size > 0);
2032
2033        // Test clear
2034        assert!(cache.clear().is_ok());
2035        assert!(cache.is_empty());
2036    }
2037
2038    #[test]
2039    fn test_persistent_cache_compression() {
2040        use tempfile::TempDir;
2041
2042        let temp_dir = TempDir::new().unwrap();
2043        let cache_dir = temp_dir.path().join("test_compression");
2044
2045        let mut cache = PersistentCache::new(&cache_dir, 10, 1).unwrap();
2046
2047        // Test compression level setting
2048        cache.set_compression_level(9);
2049
2050        // Insert large value to test compression
2051        let large_value = "x".repeat(1000);
2052        assert!(cache
2053            .insert("large_key".to_string(), large_value.clone())
2054            .is_ok());
2055        assert_eq!(cache.get(&"large_key".to_string()), Some(large_value));
2056
2057        // Test that compressed file is smaller than original
2058        let stats = cache.stats();
2059        assert!(stats.cache_dir_size < 1000); // Should be compressed
2060    }
2061
2062    #[test]
2063    fn test_persistent_cache_memory_fallback() {
2064        use tempfile::TempDir;
2065
2066        let temp_dir = TempDir::new().unwrap();
2067        let cache_dir = temp_dir.path().join("test_memory");
2068
2069        let cache = PersistentCache::new(&cache_dir, 2, 6).unwrap();
2070
2071        // Fill memory cache
2072        assert!(cache
2073            .insert("key1".to_string(), "value1".to_string())
2074            .is_ok());
2075        assert!(cache
2076            .insert("key2".to_string(), "value2".to_string())
2077            .is_ok());
2078
2079        // This should evict from memory but still be available on disk
2080        assert!(cache
2081            .insert("key3".to_string(), "value3".to_string())
2082            .is_ok());
2083
2084        // All values should still be retrievable
2085        assert_eq!(cache.get(&"key1".to_string()), Some("value1".to_string()));
2086        assert_eq!(cache.get(&"key2".to_string()), Some("value2".to_string()));
2087        assert_eq!(cache.get(&"key3".to_string()), Some("value3".to_string()));
2088
2089        let stats = cache.stats();
2090        assert_eq!(stats.disk_entries, 3);
2091    }
2092
2093    #[test]
2094    fn test_multi_gpu_manager_creation() {
2095        use multi_gpu::MultiGpuManager;
2096
2097        let manager = MultiGpuManager::new(4).unwrap();
2098
2099        // Should have at least one device (CPU fallback)
2100        assert!(manager.device_count() >= 1);
2101
2102        // Device info should be available
2103        let info = manager.device_info();
2104        assert!(!info.is_empty());
2105        assert!(!info[0].is_empty());
2106    }
2107
2108    #[tokio::test]
2109    async fn test_multi_gpu_distributed_correlation() {
2110        use multi_gpu::MultiGpuManager;
2111
2112        let manager = MultiGpuManager::new(2).unwrap();
2113
2114        let data_pairs = vec![
2115            (vec![1.0, 2.0, 3.0], vec![1.0, 2.0, 3.0]),
2116            (vec![4.0, 5.0, 6.0], vec![4.0, 5.0, 6.0]),
2117            (vec![1.0, 0.0, -1.0], vec![-1.0, 0.0, 1.0]),
2118        ];
2119
2120        let correlations = manager
2121            .distributed_correlation_batch(&data_pairs)
2122            .await
2123            .unwrap();
2124
2125        assert_eq!(correlations.len(), 3);
2126        assert!((correlations[0] - 1.0).abs() < 0.001); // Perfect correlation
2127        assert!((correlations[1] - 1.0).abs() < 0.001); // Perfect correlation
2128        assert!((correlations[2] + 1.0).abs() < 0.001); // Perfect negative correlation
2129    }
2130
2131    #[tokio::test]
2132    async fn test_multi_gpu_distributed_fft() {
2133        use multi_gpu::MultiGpuManager;
2134
2135        let manager = MultiGpuManager::new(2).unwrap();
2136
2137        let signals = vec![vec![1.0, 0.0, -1.0, 0.0], vec![0.5, 1.0, 0.5, 0.0]];
2138
2139        let spectra = manager.distributed_fft_batch(&signals).await.unwrap();
2140
2141        assert_eq!(spectra.len(), 2);
2142        assert!(!spectra[0].is_empty());
2143        assert!(!spectra[1].is_empty());
2144    }
2145
2146    #[tokio::test]
2147    async fn test_multi_gpu_distributed_spectral_analysis() {
2148        use multi_gpu::MultiGpuManager;
2149
2150        let manager = MultiGpuManager::new(2).unwrap();
2151
2152        let signals = vec![vec![0.1; 512], vec![0.2; 512]];
2153        let sample_rate = 16000.0;
2154
2155        let features = manager
2156            .distributed_spectral_analysis(&signals, sample_rate)
2157            .await
2158            .unwrap();
2159
2160        assert_eq!(features.len(), 2);
2161        for feature in &features {
2162            assert!(feature.centroid >= 0.0);
2163            assert!(feature.energy >= 0.0);
2164            assert!(feature.rolloff >= 0.0);
2165        }
2166    }
2167
2168    #[test]
2169    fn test_multi_gpu_metrics() {
2170        use multi_gpu::MultiGpuMetrics;
2171
2172        let mut metrics = MultiGpuMetrics::new(3);
2173        metrics.operations_per_device = vec![10, 10, 10]; // Perfectly balanced
2174
2175        let efficiency = metrics.load_balance_efficiency();
2176        assert!((efficiency - 1.0).abs() < 0.1); // Should be close to perfectly balanced
2177
2178        // Test unbalanced case
2179        metrics.operations_per_device = vec![20, 5, 5]; // Unbalanced
2180        let efficiency = metrics.load_balance_efficiency();
2181        assert!(efficiency < 1.0); // Should be less than perfectly balanced
2182    }
2183
2184    #[test]
2185    fn test_multi_gpu_load_balancing() {
2186        use multi_gpu::MultiGpuManager;
2187
2188        let devices = vec![Device::Cpu, Device::Cpu]; // Use CPU devices for testing
2189        let manager = MultiGpuManager::with_devices(devices, 4);
2190
2191        // Test round-robin load balancing
2192        let device1 = manager.get_next_device();
2193        let device2 = manager.get_next_device();
2194        let device3 = manager.get_next_device(); // Should wrap back to first device
2195
2196        assert!(std::ptr::eq(device1, device3)); // Should be the same device due to round-robin
2197    }
2198}