use crate::error::{FFTError, FFTResult};
use crate::sparse_fft::{SparseFFTAlgorithm, SparseFFTConfig, SparseFFTResult, WindowFunction};
use crate::sparse_fft_gpu::{GPUBackend, GPUSparseFFTConfig};
use crate::sparse_fft_gpu_memory::{
init_cuda_device, init_hip_device, init_sycl_device, is_cuda_available, is_hip_available,
is_sycl_available,
};
use scirs2_core::numeric::Complex64;
use scirs2_core::numeric::NumCast;
use scirs2_core::parallel_ops::*;
use scirs2_core::simd_ops::PlatformCapabilities;
use std::collections::HashMap;
use std::fmt::Debug;
use std::sync::{Arc, Mutex};
use std::time::Instant;
#[derive(Debug, Clone)]
pub struct GPUDeviceInfo {
pub device_id: i32,
pub backend: GPUBackend,
pub device_name: String,
pub memory_total: usize,
pub memory_free: usize,
pub compute_capability: f32,
pub compute_units: usize,
pub max_threads_per_block: usize,
pub is_available: bool,
}
impl Default for GPUDeviceInfo {
fn default() -> Self {
Self {
device_id: -1,
backend: GPUBackend::CPUFallback,
device_name: "Unknown Device".to_string(),
memory_total: 0,
memory_free: 0,
compute_capability: 0.0,
compute_units: 0,
max_threads_per_block: 0,
is_available: false,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum WorkloadDistribution {
Equal,
MemoryBased,
ComputeBased,
Manual,
Adaptive,
}
#[derive(Debug, Clone)]
pub struct MultiGPUConfig {
pub base_config: SparseFFTConfig,
pub distribution: WorkloadDistribution,
pub manual_ratios: Vec<f32>,
pub max_devices: usize,
pub min_signal_size: usize,
pub chunk_overlap: usize,
pub enable_load_balancing: bool,
pub device_timeout_ms: u64,
}
impl Default for MultiGPUConfig {
fn default() -> Self {
Self {
base_config: SparseFFTConfig::default(),
distribution: WorkloadDistribution::ComputeBased,
manual_ratios: Vec::new(),
max_devices: 0, min_signal_size: 4096, chunk_overlap: 0,
enable_load_balancing: true,
device_timeout_ms: 5000,
}
}
}
pub struct MultiGPUSparseFFT {
_config: MultiGPUConfig,
devices: Vec<GPUDeviceInfo>,
selected_devices: Vec<usize>,
performance_history: Arc<Mutex<HashMap<i32, Vec<f64>>>>,
initialized: bool,
}
impl MultiGPUSparseFFT {
pub fn new(config: MultiGPUConfig) -> Self {
Self {
_config: config,
devices: Vec::new(),
selected_devices: Vec::new(),
performance_history: Arc::new(Mutex::new(HashMap::new())),
initialized: false,
}
}
pub fn initialize(&mut self) -> FFTResult<()> {
if self.initialized {
return Ok(());
}
self.enumerate_devices()?;
self.select_devices()?;
self.initialized = true;
Ok(())
}
fn enumerate_devices(&mut self) -> FFTResult<()> {
self.devices.clear();
if is_cuda_available() {
self.enumerate_cuda_devices()?;
}
if is_hip_available() {
self.enumerate_hip_devices()?;
}
if is_sycl_available() {
self.enumerate_sycl_devices()?;
}
#[cfg(target_pointer_width = "32")]
let (memory_total, memory_free) = (1024 * 1024 * 1024, 512 * 1024 * 1024); #[cfg(target_pointer_width = "64")]
let (memory_total, memory_free) =
(16usize * 1024 * 1024 * 1024, 8usize * 1024 * 1024 * 1024);
self.devices.push(GPUDeviceInfo {
device_id: -1,
backend: GPUBackend::CPUFallback,
device_name: "CPU Fallback".to_string(),
memory_total,
memory_free,
compute_capability: 1.0,
compute_units: num_cpus::get(),
max_threads_per_block: 1,
is_available: true,
});
Ok(())
}
fn enumerate_cuda_devices(&mut self) -> FFTResult<()> {
if init_cuda_device()? {
#[cfg(target_pointer_width = "32")]
let (memory_total, memory_free) = (512 * 1024 * 1024, 384 * 1024 * 1024); #[cfg(target_pointer_width = "64")]
let (memory_total, memory_free) =
(8usize * 1024 * 1024 * 1024, 6usize * 1024 * 1024 * 1024);
self.devices.push(GPUDeviceInfo {
device_id: 0,
backend: GPUBackend::CUDA,
device_name: "NVIDIA GPU (simulated)".to_string(),
memory_total,
memory_free,
compute_capability: 8.6,
compute_units: 68,
max_threads_per_block: 1024,
is_available: true,
});
}
Ok(())
}
fn enumerate_hip_devices(&mut self) -> FFTResult<()> {
if init_hip_device()? {
#[cfg(target_pointer_width = "32")]
let (memory_total, memory_free) = (1024 * 1024 * 1024, 768 * 1024 * 1024); #[cfg(target_pointer_width = "64")]
let (memory_total, memory_free) =
(16usize * 1024 * 1024 * 1024, 12usize * 1024 * 1024 * 1024);
self.devices.push(GPUDeviceInfo {
device_id: 0,
backend: GPUBackend::HIP,
device_name: "AMD GPU (simulated)".to_string(),
memory_total,
memory_free,
compute_capability: 10.3, compute_units: 40,
max_threads_per_block: 256,
is_available: true,
});
}
Ok(())
}
fn enumerate_sycl_devices(&mut self) -> FFTResult<()> {
if init_sycl_device()? {
#[cfg(target_pointer_width = "32")]
let (memory_total, memory_free) = (256 * 1024 * 1024, 192 * 1024 * 1024); #[cfg(target_pointer_width = "64")]
let (memory_total, memory_free) =
(4usize * 1024 * 1024 * 1024, 3usize * 1024 * 1024 * 1024);
self.devices.push(GPUDeviceInfo {
device_id: 0,
backend: GPUBackend::SYCL,
device_name: "Intel GPU (simulated)".to_string(),
memory_total,
memory_free,
compute_capability: 1.2, compute_units: 96,
max_threads_per_block: 512,
is_available: true,
});
}
Ok(())
}
fn select_devices(&mut self) -> FFTResult<()> {
self.selected_devices.clear();
let available_devices: Vec<(usize, &GPUDeviceInfo)> = self
.devices
.iter()
.enumerate()
.filter(|(_, device)| device.is_available)
.collect();
if available_devices.is_empty() {
return Err(FFTError::ComputationError(
"No available GPU devices found".to_string(),
));
}
let max_devices = if self._config.max_devices == 0 {
available_devices.len()
} else {
self._config.max_devices.min(available_devices.len())
};
match self._config.distribution {
WorkloadDistribution::Equal => {
for i in 0..max_devices {
self.selected_devices.push(available_devices[i].0);
}
}
WorkloadDistribution::ComputeBased => {
let mut sorted_devices = available_devices;
sorted_devices.sort_by(|a, b| {
b.1.compute_capability
.partial_cmp(&a.1.compute_capability)
.unwrap_or(std::cmp::Ordering::Equal)
});
for i in 0..max_devices {
self.selected_devices.push(sorted_devices[i].0);
}
}
WorkloadDistribution::MemoryBased => {
let mut sorted_devices = available_devices;
sorted_devices.sort_by_key(|item| std::cmp::Reverse(item.1.memory_free));
for i in 0..max_devices {
self.selected_devices.push(sorted_devices[i].0);
}
}
WorkloadDistribution::Manual => {
for i in 0..max_devices {
self.selected_devices.push(available_devices[i].0);
}
}
WorkloadDistribution::Adaptive => {
let available_devices_clone: Vec<(usize, GPUDeviceInfo)> = available_devices
.iter()
.map(|(idx, device)| (*idx, (*device).clone()))
.collect();
self.select_adaptive_devices_with_clone(available_devices_clone, max_devices)?;
}
}
Ok(())
}
fn select_adaptive_devices_with_clone(
&mut self,
available_devices: Vec<(usize, GPUDeviceInfo)>,
max_devices: usize,
) -> FFTResult<()> {
let performance_history = self.performance_history.lock().expect("Operation failed");
let mut device_scores: Vec<(usize, f64)> = available_devices
.iter()
.map(|(idx, device)| {
let avg_performance = performance_history
.get(&device.device_id)
.map(|times| {
if times.is_empty() {
device.compute_capability as f64 * device.compute_units as f64
} else {
1.0 / (times.iter().sum::<f64>() / times.len() as f64)
}
})
.unwrap_or_else(|| {
device.compute_capability as f64 * device.compute_units as f64
});
(*idx, avg_performance)
})
.collect();
device_scores.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
for i in 0..max_devices {
self.selected_devices.push(device_scores[i].0);
}
Ok(())
}
pub fn get_devices(&self) -> &[GPUDeviceInfo] {
&self.devices
}
pub fn get_selected_devices(&self) -> Vec<&GPUDeviceInfo> {
self.selected_devices
.iter()
.map(|&idx| &self.devices[idx])
.collect()
}
pub fn sparse_fft<T>(&mut self, signal: &[T]) -> FFTResult<SparseFFTResult>
where
T: NumCast + Copy + Debug + Send + Sync + 'static,
{
if !self.initialized {
self.initialize()?;
}
let signal_len = signal.len();
if signal_len < self._config.min_signal_size || self.selected_devices.len() <= 1 {
return self.single_device_sparse_fft(signal);
}
self.multi_device_sparse_fft(signal)
}
fn single_device_sparse_fft<T>(&mut self, signal: &[T]) -> FFTResult<SparseFFTResult>
where
T: NumCast + Copy + Debug + 'static,
{
let device_idx = self.selected_devices.first().copied().unwrap_or(0);
let device = &self.devices[device_idx];
let gpu_config = GPUSparseFFTConfig {
base_config: self._config.base_config.clone(),
backend: device.backend,
device_id: device.device_id,
..GPUSparseFFTConfig::default()
};
let mut processor = crate::sparse_fft_gpu::GPUSparseFFT::new(gpu_config);
processor.sparse_fft(signal)
}
fn multi_device_sparse_fft<T>(&mut self, signal: &[T]) -> FFTResult<SparseFFTResult>
where
T: NumCast + Copy + Debug + Send + Sync + 'static,
{
let signal_len = signal.len();
let num_devices = self.selected_devices.len();
let chunk_sizes = self.calculate_chunk_sizes(signal_len, num_devices)?;
let chunks = self.split_signal(signal, &chunk_sizes)?;
let chunk_results: Result<Vec<_>, _> = chunks
.par_iter()
.zip(self.selected_devices.par_iter())
.map(|(chunk, &device_idx)| {
let device = &self.devices[device_idx];
let start_time = Instant::now();
let gpu_config = GPUSparseFFTConfig {
base_config: self._config.base_config.clone(),
backend: device.backend,
device_id: device.device_id,
..GPUSparseFFTConfig::default()
};
let mut processor = crate::sparse_fft_gpu::GPUSparseFFT::new(gpu_config);
let result = processor.sparse_fft(chunk);
if result.is_ok() {
let execution_time = start_time.elapsed().as_secs_f64();
if let Ok(mut history) = self.performance_history.try_lock() {
history
.entry(device.device_id)
.or_default()
.push(execution_time);
if let Some(times) = history.get_mut(&device.device_id) {
if times.len() > 10 {
times.drain(0..times.len() - 10);
}
}
}
}
result
})
.collect();
let chunk_results = chunk_results?;
self.combine_chunk_results(chunk_results)
}
fn calculate_chunk_sizes(
&self,
signal_len: usize,
num_devices: usize,
) -> FFTResult<Vec<usize>> {
let mut chunk_sizes = Vec::with_capacity(num_devices);
match self._config.distribution {
WorkloadDistribution::Equal => {
let base_size = signal_len / num_devices;
let remainder = signal_len % num_devices;
for i in 0..num_devices {
let size = if i < remainder {
base_size + 1
} else {
base_size
};
chunk_sizes.push(size);
}
}
WorkloadDistribution::ComputeBased => {
let total_compute: f32 = self
.selected_devices
.iter()
.map(|&idx| {
self.devices[idx].compute_capability
* self.devices[idx].compute_units as f32
})
.sum();
let mut remaining = signal_len;
for (i, &device_idx) in self.selected_devices.iter().enumerate() {
let device = &self.devices[device_idx];
let device_compute = device.compute_capability * device.compute_units as f32;
let ratio = device_compute / total_compute;
let size = if i == num_devices - 1 {
remaining } else {
let size = (signal_len as f32 * ratio) as usize;
remaining = remaining.saturating_sub(size);
size
};
chunk_sizes.push(size);
}
}
WorkloadDistribution::MemoryBased => {
let total_memory: usize = self
.selected_devices
.iter()
.map(|&idx| self.devices[idx].memory_free)
.sum();
let mut remaining = signal_len;
for (i, &device_idx) in self.selected_devices.iter().enumerate() {
let device = &self.devices[device_idx];
let ratio = device.memory_free as f32 / total_memory as f32;
let size = if i == num_devices - 1 {
remaining
} else {
let size = (signal_len as f32 * ratio) as usize;
remaining = remaining.saturating_sub(size);
size
};
chunk_sizes.push(size);
}
}
WorkloadDistribution::Manual => {
if self._config.manual_ratios.len() != num_devices {
return Err(FFTError::ValueError(
"Manual ratios length must match number of selected _devices".to_string(),
));
}
let total_ratio: f32 = self._config.manual_ratios.iter().sum();
let mut remaining = signal_len;
for (i, &ratio) in self._config.manual_ratios.iter().enumerate() {
let size = if i == num_devices - 1 {
remaining
} else {
let size = (signal_len as f32 * ratio / total_ratio) as usize;
remaining = remaining.saturating_sub(size);
size
};
chunk_sizes.push(size);
}
}
WorkloadDistribution::Adaptive => {
return self.calculate_chunk_sizes(signal_len, num_devices);
}
}
Ok(chunk_sizes)
}
fn split_signal<T>(&self, signal: &[T], chunksizes: &[usize]) -> FFTResult<Vec<Vec<T>>>
where
T: Copy,
{
let mut chunks = Vec::new();
let mut offset = 0;
for &chunk_size in chunksizes {
if offset + chunk_size > signal.len() {
return Err(FFTError::ValueError(
"Chunk sizes exceed signal length".to_string(),
));
}
let chunk_end = offset + chunk_size;
let chunk = signal[offset..chunk_end].to_vec();
chunks.push(chunk);
offset = chunk_end;
}
Ok(chunks)
}
fn combine_chunk_results(
&self,
chunk_results: Vec<SparseFFTResult>,
) -> FFTResult<SparseFFTResult> {
if chunk_results.is_empty() {
return Err(FFTError::ComputationError(
"No chunk _results to combine".to_string(),
));
}
if chunk_results.len() == 1 {
return Ok(chunk_results.into_iter().next().expect("Operation failed"));
}
let max_computation_time = chunk_results
.iter()
.map(|r| r.computation_time)
.max()
.unwrap_or_default();
let mut combined_values = Vec::new();
let mut combined_indices = Vec::new();
let mut index_offset = 0;
for result in chunk_results {
let indices_len = result.indices.len();
combined_values.extend(result.values);
let adjusted_indices: Vec<usize> = result
.indices
.into_iter()
.map(|idx| idx + index_offset)
.collect();
combined_indices.extend(adjusted_indices);
index_offset += indices_len;
}
let mut frequency_map: std::collections::HashMap<usize, Complex64> =
std::collections::HashMap::new();
for (idx, value) in combined_indices.iter().zip(combined_values.iter()) {
frequency_map.insert(*idx, *value);
}
let mut sorted_entries: Vec<_> = frequency_map.into_iter().collect();
sorted_entries.sort_by_key(|&(idx_, _)| idx_);
let final_indices: Vec<usize> = sorted_entries.iter().map(|(idx_, _)| *idx_).collect();
let final_values: Vec<Complex64> = sorted_entries.iter().map(|(_, val)| *val).collect();
let total_estimated_sparsity = final_values.len();
Ok(SparseFFTResult {
values: final_values,
indices: final_indices,
estimated_sparsity: total_estimated_sparsity,
computation_time: max_computation_time,
algorithm: self._config.base_config.algorithm,
})
}
pub fn get_performance_stats(&self) -> HashMap<i32, Vec<f64>> {
self.performance_history
.lock()
.expect("Operation failed")
.clone()
}
pub fn reset_performance_history(&mut self) {
self.performance_history
.lock()
.expect("Operation failed")
.clear();
}
}
#[allow(dead_code)]
pub fn multi_gpu_sparse_fft<T>(
signal: &[T],
k: usize,
algorithm: Option<SparseFFTAlgorithm>,
window_function: Option<WindowFunction>,
) -> FFTResult<SparseFFTResult>
where
T: NumCast + Copy + Debug + Send + Sync + 'static,
{
let base_config = SparseFFTConfig {
sparsity: k,
algorithm: algorithm.unwrap_or(SparseFFTAlgorithm::Sublinear),
window_function: window_function.unwrap_or(WindowFunction::None),
..SparseFFTConfig::default()
};
let config = MultiGPUConfig {
base_config,
..MultiGPUConfig::default()
};
let mut processor = MultiGPUSparseFFT::new(config);
processor.sparse_fft(signal)
}
#[cfg(test)]
mod tests {
use super::*;
use std::f64::consts::PI;
fn create_sparse_signal(n: usize, frequencies: &[(usize, f64)]) -> Vec<f64> {
let mut signal = vec![0.0; n];
for i in 0..n {
let t = 2.0 * PI * (i as f64) / (n as f64);
for &(freq, amp) in frequencies {
signal[i] += amp * (freq as f64 * t).sin();
}
}
signal
}
#[test]
fn test_multi_gpu_initialization() {
let mut processor = MultiGPUSparseFFT::new(MultiGPUConfig::default());
let result = processor.initialize();
assert!(result.is_ok());
assert!(!processor.get_devices().is_empty());
let caps = PlatformCapabilities::detect();
if !caps.cuda_available && !caps.gpu_available {
eprintln!("GPU not available, verifying CPU fallback is present");
assert!(processor
.get_devices()
.iter()
.any(|d| d.backend == GPUBackend::CPUFallback));
}
}
#[test]
fn test_device_enumeration() {
let mut processor = MultiGPUSparseFFT::new(MultiGPUConfig::default());
processor.initialize().expect("Operation failed");
let devices = processor.get_devices();
assert!(!devices.is_empty());
assert!(devices.iter().any(|d| d.backend == GPUBackend::CPUFallback));
let caps = PlatformCapabilities::detect();
if caps.cuda_available || caps.gpu_available {
eprintln!("GPU available, checking for GPU devices in enumeration");
assert!(!devices.is_empty());
} else {
eprintln!("GPU not available, verifying only CPU fallback present");
assert_eq!(devices.len(), 1);
assert_eq!(devices[0].backend, GPUBackend::CPUFallback);
}
}
#[test]
fn test_multi_gpu_sparse_fft() {
let caps = PlatformCapabilities::detect();
let n = if caps.cuda_available || caps.gpu_available {
8192 } else {
eprintln!("GPU not available, using smaller size for CPU fallback");
1024 };
let frequencies = vec![(10, 1.0), (50, 0.5), (100, 0.25)];
let signal = create_sparse_signal(n, &frequencies);
let result = multi_gpu_sparse_fft(
&signal,
6,
Some(SparseFFTAlgorithm::Sublinear),
Some(WindowFunction::Hann),
);
assert!(result.is_ok());
let result = result.expect("Operation failed");
assert!(!result.values.is_empty());
assert_eq!(result.values.len(), result.indices.len());
}
#[test]
fn test_chunk_size_calculation() {
let config = MultiGPUConfig {
distribution: WorkloadDistribution::Equal,
..MultiGPUConfig::default()
};
let mut processor = MultiGPUSparseFFT::new(config);
processor.selected_devices = vec![0, 1, 2];
let chunk_sizes = processor
.calculate_chunk_sizes(1000, 3)
.expect("Operation failed");
assert_eq!(chunk_sizes.len(), 3);
assert_eq!(chunk_sizes.iter().sum::<usize>(), 1000);
}
#[test]
fn test_signal_splitting() {
let processor = MultiGPUSparseFFT::new(MultiGPUConfig::default());
let signal = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
let chunk_sizes = vec![3, 3, 4];
let chunks = processor
.split_signal(&signal, &chunk_sizes)
.expect("Operation failed");
assert_eq!(chunks.len(), 3);
assert_eq!(chunks[0], vec![1, 2, 3]);
assert_eq!(chunks[1], vec![4, 5, 6]);
assert_eq!(chunks[2], vec![7, 8, 9, 10]);
}
}