1use crate::gpu::{async_execution::*, GpuBackend, GpuError};
8use std::collections::HashMap;
9use std::sync::{Arc, Mutex};
10use std::time::{Duration, Instant};
11use thiserror::Error;
12
13#[derive(Error, Debug)]
15pub enum HeterogeneousError {
16 #[error("No suitable compute device found for workload")]
18 NoSuitableDevice,
19
20 #[error("Workload balancing failed: {0}")]
22 BalancingFailed(String),
23
24 #[error("Data migration error: {0}")]
26 DataMigration(String),
27
28 #[error("Execution coordination error: {0}")]
30 ExecutionCoordination(String),
31
32 #[error("Resource exhausted: {0}")]
34 ResourceExhausted(String),
35
36 #[error("GPU error: {0}")]
38 GpuError(#[from] GpuError),
39}
40
41#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
43pub enum ComputeDevice {
44 Cpu,
46 Gpu(GpuBackend),
48 Npu,
50 Fpga,
52 Dsp,
54}
55
56impl ComputeDevice {
57 pub fn is_available(&self) -> bool {
59 match self {
60 ComputeDevice::Cpu => true,
61 ComputeDevice::Gpu(backend) => backend.is_available(),
62 ComputeDevice::Npu => false, ComputeDevice::Fpga => false, ComputeDevice::Dsp => false, }
66 }
67
68 pub fn performance_factor(&self, optype: &WorkloadType) -> f64 {
70 match (self, optype) {
71 (ComputeDevice::Cpu, WorkloadType::Sequential) => 1.0,
72 (ComputeDevice::Cpu, WorkloadType::Parallel) => 0.3,
73 (ComputeDevice::Cpu, WorkloadType::VectorizedMath) => 0.2,
74 (ComputeDevice::Cpu, WorkloadType::MatrixOperations) => 0.1,
75 (ComputeDevice::Cpu, WorkloadType::ConvolutionalNN) => 0.05,
76
77 (ComputeDevice::Gpu(_), WorkloadType::Sequential) => 0.1,
78 (ComputeDevice::Gpu(_), WorkloadType::Parallel) => 1.0,
79 (ComputeDevice::Gpu(_), WorkloadType::VectorizedMath) => 1.0,
80 (ComputeDevice::Gpu(_), WorkloadType::MatrixOperations) => 1.0,
81 (ComputeDevice::Gpu(_), WorkloadType::ConvolutionalNN) => 1.0,
82
83 (ComputeDevice::Npu, WorkloadType::ConvolutionalNN) => 1.5,
84 (ComputeDevice::Npu, WorkloadType::MatrixOperations) => 1.2,
85 (ComputeDevice::Npu, _) => 0.8,
86 _ => 0.5, }
88 }
89}
90
91#[derive(Debug, Clone, PartialEq, Eq, Hash)]
93pub enum WorkloadType {
94 Sequential,
96 Parallel,
98 VectorizedMath,
100 MatrixOperations,
102 SparseOperations,
104 ConvolutionalNN,
106 MemoryIntensive,
108 Custom(String),
110}
111
112#[derive(Debug, Clone)]
114pub struct WorkloadCharacteristics {
115 pub workload_type: WorkloadType,
117 pub problemsize: usize,
119 pub memory_requirement: usize,
121 pub computational_intensity: f64,
123 pub data_locality: f64,
125 pub parallelization_factor: f64,
127 pub preferred_datatypes: Vec<String>,
129}
130
131impl WorkloadCharacteristics {
132 pub fn matrix_multiply(m: usize, n: usize, k: usize) -> Self {
134 Self {
135 workload_type: WorkloadType::MatrixOperations,
136 problemsize: m * n * k,
137 memory_requirement: (m * k + k * n + m * n) * 8, computational_intensity: (2.0 * k as f64) / 3.0, data_locality: 0.7, parallelization_factor: 0.9, preferred_datatypes: vec!["f32".to_string(), "f16".to_string()],
142 }
143 }
144
145 pub fn size(
147 batch_size: usize,
148 channels: usize,
149 height: usize,
150 width: usize,
151 kernel_size: usize,
152 ) -> Self {
153 let input_size = batch_size * channels * height * width;
154 let output_size = batch_size * channels * height * width; Self {
157 workload_type: WorkloadType::ConvolutionalNN,
158 problemsize: input_size * kernel_size * kernel_size,
159 memory_requirement: (input_size + output_size) * 4, computational_intensity: (kernel_size * kernel_size * 2) as f64,
161 data_locality: 0.8, parallelization_factor: 0.95, preferred_datatypes: vec!["f16".to_string(), "i8".to_string()],
164 }
165 }
166
167 pub fn element(size: usize, ops_perelement: usize) -> Self {
169 Self {
170 workload_type: WorkloadType::VectorizedMath,
171 problemsize: size,
172 memory_requirement: size * 8, computational_intensity: ops_perelement as f64 / 2.0, data_locality: 1.0, parallelization_factor: 1.0, preferred_datatypes: vec!["f32".to_string(), "f64".to_string()],
177 }
178 }
179}
180
181#[derive(Debug, Clone)]
183pub struct DeviceCharacteristics {
184 pub device: ComputeDevice,
186 pub peak_gflops: f64,
188 pub memorybandwidth: f64,
190 pub available_memory: usize,
192 pub compute_units: usize,
194 pub power_consumption: f64,
196 pub transfer_overhead: Duration,
198}
199
200impl DeviceCharacteristics {
201 pub fn typical_cpu() -> Self {
203 Self {
204 device: ComputeDevice::Cpu,
205 peak_gflops: 200.0, memorybandwidth: 50.0, available_memory: 16 * 1024 * 1024 * 1024, compute_units: 8, power_consumption: 95.0, transfer_overhead: Duration::from_nanos(0), }
212 }
213
214 pub fn typical_gpu() -> Self {
216 Self {
217 device: ComputeDevice::Gpu(GpuBackend::Cuda),
218 peak_gflops: 10000.0, memorybandwidth: 900.0, available_memory: 12 * 1024 * 1024 * 1024, compute_units: 80, power_consumption: 350.0, transfer_overhead: Duration::from_micros(10), }
225 }
226
227 pub fn estimateexecution_time(&self, workload: &WorkloadCharacteristics) -> Duration {
229 let performance_factor = self.device.performance_factor(&workload.workload_type);
230
231 let compute_time =
233 (workload.problemsize as f64) / (self.peak_gflops * 1e9 * performance_factor);
234
235 let memory_time = (workload.memory_requirement as f64) / (self.memorybandwidth * 1e9);
236
237 let execution_time = compute_time.max(memory_time) + self.transfer_overhead.as_secs_f64();
239
240 Duration::from_secs_f64(execution_time)
241 }
242}
243
244#[derive(Debug, Clone)]
246pub enum ExecutionStrategy {
247 CpuOnly,
249 GpuOnly(GpuBackend),
251 CpuGpuSplit {
253 cpu_fraction: f64,
254 gpu_backend: GpuBackend,
255 },
256 MultiDevice(HashMap<ComputeDevice, f64>),
258 Automatic,
260}
261
262pub struct HeterogeneousScheduler {
264 available_devices: Vec<DeviceCharacteristics>,
265 performance_history: Arc<Mutex<HashMap<String, Duration>>>,
266 #[allow(dead_code)]
267 async_manager: AsyncGpuManager,
268}
269
270impl HeterogeneousScheduler {
271 pub fn new() -> Self {
273 let mut available_devices = vec![DeviceCharacteristics::typical_cpu()];
274
275 for backend in [GpuBackend::Cuda, GpuBackend::Rocm, GpuBackend::Metal] {
277 if backend.is_available() {
278 let mut gpu_chars = DeviceCharacteristics::typical_gpu();
279 gpu_chars.device = ComputeDevice::Gpu(backend);
280 available_devices.push(gpu_chars);
281 }
282 }
283
284 Self {
285 available_devices,
286 performance_history: Arc::new(Mutex::new(HashMap::new())),
287 async_manager: AsyncGpuManager::new(),
288 }
289 }
290
291 pub fn available_devices(&self) -> &[DeviceCharacteristics] {
293 &self.available_devices
294 }
295
296 pub fn select_strategy(
298 &self,
299 workload: &WorkloadCharacteristics,
300 ) -> Result<ExecutionStrategy, HeterogeneousError> {
301 if self.available_devices.is_empty() {
302 return Err(HeterogeneousError::NoSuitableDevice);
303 }
304
305 let mut device_times: Vec<_> = self
307 .available_devices
308 .iter()
309 .map(|device| {
310 let time = device.estimateexecution_time(workload);
311 (device.device, time)
312 })
313 .collect();
314
315 device_times.sort_by_key(|(_, time)| *time);
317
318 let best_device = device_times[0].0;
319 let best_time = device_times[0].1;
320
321 if device_times.len() >= 2 {
323 let second_best_time = device_times[1].1;
324
325 if best_time.as_secs_f64() * 1.5 > second_best_time.as_secs_f64() {
327 if let (ComputeDevice::Cpu, ComputeDevice::Gpu(backend)) =
328 (device_times[0].0, device_times[1].0)
329 {
330 return Ok(ExecutionStrategy::CpuGpuSplit {
331 cpu_fraction: 0.3,
332 gpu_backend: backend,
333 });
334 } else if let (ComputeDevice::Gpu(backend), ComputeDevice::Cpu) =
335 (device_times[0].0, device_times[1].0)
336 {
337 return Ok(ExecutionStrategy::CpuGpuSplit {
338 cpu_fraction: 0.3,
339 gpu_backend: backend,
340 });
341 }
342 }
343 }
344
345 match best_device {
347 ComputeDevice::Cpu => Ok(ExecutionStrategy::CpuOnly),
348 ComputeDevice::Gpu(backend) => Ok(ExecutionStrategy::GpuOnly(backend)),
349 ComputeDevice::Npu => Ok(ExecutionStrategy::CpuOnly), ComputeDevice::Fpga => Ok(ExecutionStrategy::CpuOnly), ComputeDevice::Dsp => Ok(ExecutionStrategy::CpuOnly), }
353 }
354
355 pub fn execute_workload<F, R>(
357 &self,
358 workload: &WorkloadCharacteristics,
359 strategy: ExecutionStrategy,
360 work_fn: F,
361 ) -> Result<R, HeterogeneousError>
362 where
363 F: FnOnce(&ExecutionStrategy) -> Result<R, HeterogeneousError>,
364 {
365 let start_time = Instant::now();
366
367 let result = work_fn(&strategy)?;
368
369 let execution_time = start_time.elapsed();
370
371 let key = format!(
373 "{workload_type:?}_{problemsize}",
374 workload_type = workload.workload_type,
375 problemsize = workload.problemsize
376 );
377 self.performance_history
378 .lock()
379 .expect("Operation failed")
380 .insert(key, execution_time);
381
382 Ok(result)
383 }
384
385 pub fn get_performance_stats(&self) -> HeterogeneousStats {
387 let history = self.performance_history.lock().expect("Operation failed");
388
389 let total_executions = history.len();
390 let avgexecution_time = if total_executions > 0 {
391 let total_time: Duration = history.values().sum();
392 total_time / total_executions as u32
393 } else {
394 Duration::ZERO
395 };
396
397 HeterogeneousStats {
398 available_devices: self.available_devices.len(),
399 total_executions,
400 avgexecution_time,
401 device_utilization: self.calculate_device_utilization(),
402 }
403 }
404
405 fn calculate_device_utilization(&self) -> HashMap<ComputeDevice, f64> {
407 let mut utilization = HashMap::new();
410 for device in &self.available_devices {
411 utilization.insert(device.device, 0.0);
412 }
413 utilization
414 }
415
416 pub fn optimize_strategy(
418 &self,
419 workload: &WorkloadCharacteristics,
420 current_strategy: ExecutionStrategy,
421 ) -> ExecutionStrategy {
422 let key = format!(
423 "{workload_type:?}_{problemsize}",
424 workload_type = workload.workload_type,
425 problemsize = workload.problemsize
426 );
427 let history = self.performance_history.lock().expect("Operation failed");
428
429 if let Some(&_historical_time) = history.get(&key) {
431 return current_strategy;
435 }
436
437 current_strategy
438 }
439}
440
441impl Default for HeterogeneousScheduler {
442 fn default() -> Self {
443 Self::new()
444 }
445}
446
447#[derive(Debug, Clone)]
449pub struct HeterogeneousStats {
450 pub available_devices: usize,
452 pub total_executions: usize,
454 pub avgexecution_time: Duration,
456 pub device_utilization: HashMap<ComputeDevice, f64>,
458}
459
460#[derive(Debug, Clone)]
462pub struct WorkloadDistribution {
463 pub assignments: HashMap<ComputeDevice, f64>,
465 pub partitioning: PartitioningStrategy,
467 pub coordination: CoordinationStrategy,
469}
470
471#[derive(Debug, Clone)]
473pub enum PartitioningStrategy {
474 RowSplit,
476 ColumnSplit,
478 BlockSplit { block_size: (usize, usize) },
480 Custom(String),
482}
483
484#[derive(Debug, Clone)]
486pub enum CoordinationStrategy {
487 BulkSynchronous,
489 AsyncWithEvents,
491 Pipeline,
493 Custom(String),
495}
496
497pub mod patterns {
499 use super::*;
500
501 pub fn heterogeneous_map<T, F>(
503 scheduler: &HeterogeneousScheduler,
504 data: &[T],
505 map_fn: F,
506 ) -> Result<Vec<T>, HeterogeneousError>
507 where
508 T: Clone + Send + Sync,
509 F: Fn(&T) -> T + Send + Sync,
510 {
511 let workload = WorkloadCharacteristics::element(data.len(), 1);
512 let strategy = scheduler.select_strategy(&workload)?;
513
514 scheduler.execute_workload(&workload, strategy, |_strategy| {
515 Ok(data.iter().map(map_fn).collect())
517 })
518 }
519
520 pub fn heterogeneous_reduce<T, F>(
522 scheduler: &HeterogeneousScheduler,
523 data: &[T],
524 initial: T,
525 reduce_fn: F,
526 ) -> Result<T, HeterogeneousError>
527 where
528 T: Clone + Send + Sync,
529 F: Fn(T, &T) -> T + Send + Sync,
530 {
531 let workload = WorkloadCharacteristics::element(data.len(), 1);
532 let strategy = scheduler.select_strategy(&workload)?;
533
534 scheduler.execute_workload(&workload, strategy, |_strategy| {
535 Ok(data.iter().fold(initial, reduce_fn))
536 })
537 }
538}
539
540#[cfg(test)]
541mod tests {
542 use super::*;
543
544 #[test]
545 fn test_device_availability() {
546 assert!(ComputeDevice::Cpu.is_available());
547 }
549
550 #[test]
551 fn test_workload_characteristics() {
552 let gemm = WorkloadCharacteristics::matrix_multiply(1000, 1000, 1000);
553 assert_eq!(gemm.workload_type, WorkloadType::MatrixOperations);
554 assert!(gemm.computational_intensity > 0.0);
555 }
556
557 #[test]
558 fn test_device_characteristics() {
559 let cpu = DeviceCharacteristics::typical_cpu();
560 let gpu = DeviceCharacteristics::typical_gpu();
561
562 assert_eq!(cpu.device, ComputeDevice::Cpu);
563 assert!(matches!(gpu.device, ComputeDevice::Gpu(_)));
564 assert!(gpu.peak_gflops > cpu.peak_gflops);
565 }
566
567 #[test]
568 fn testexecution_time_estimation() {
569 let cpu = DeviceCharacteristics::typical_cpu();
570 let workload = WorkloadCharacteristics::element(1000000, 1);
571
572 let time = cpu.estimateexecution_time(&workload);
573 assert!(time > Duration::ZERO);
574 }
575
576 #[test]
577 fn test_scheduler_creation() {
578 let scheduler = HeterogeneousScheduler::new();
579 assert!(!scheduler.available_devices().is_empty());
580 }
581
582 #[test]
583 fn test_strategy_selection() {
584 let scheduler = HeterogeneousScheduler::new();
585 let workload = WorkloadCharacteristics::matrix_multiply(100, 100, 100);
586
587 let strategy = scheduler.select_strategy(&workload);
588 assert!(strategy.is_ok());
589 }
590}