use crate::error::{ClusteringError, Result};
use scirs2_core::ndarray::{Array1, Array2, ArrayView2, Axis};
use scirs2_core::numeric::{Float, FromPrimitive};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::fmt;
use std::time::{Duration, Instant};
use super::core::{DeviceSelection, GpuBackend, GpuConfig, GpuContext, GpuDevice};
use super::memory::{GpuMemoryBlock, GpuMemoryManager, MemoryStats, MemoryStrategy};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum AdvancedMemoryStrategy {
Conservative,
Aggressive,
Adaptive,
Streaming {
chunk_size: usize,
},
Unified,
Pool {
pool_size: usize,
},
}
impl Default for AdvancedMemoryStrategy {
fn default() -> Self {
AdvancedMemoryStrategy::Adaptive
}
}
impl fmt::Display for AdvancedMemoryStrategy {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
AdvancedMemoryStrategy::Conservative => write!(f, "Conservative"),
AdvancedMemoryStrategy::Aggressive => write!(f, "Aggressive"),
AdvancedMemoryStrategy::Adaptive => write!(f, "Adaptive"),
AdvancedMemoryStrategy::Streaming { chunk_size } => {
write!(f, "Streaming({}MB)", chunk_size / (1024 * 1024))
}
AdvancedMemoryStrategy::Unified => write!(f, "Unified"),
AdvancedMemoryStrategy::Pool { pool_size } => {
write!(f, "Pool({}MB)", pool_size / (1024 * 1024))
}
}
}
}
#[derive(Debug)]
pub struct AdvancedGpuMemoryManager {
base_manager: GpuMemoryManager,
strategy: AdvancedMemoryStrategy,
available_memory: usize,
allocation_history: Vec<AllocationRecord>,
pressure_threshold: f64,
enable_defrag: bool,
usage_stats: MemoryUsageStats,
}
#[derive(Debug, Clone)]
pub struct AllocationRecord {
pub size: usize,
pub timestamp: Instant,
pub duration: Option<Duration>,
pub success: bool,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct MemoryUsageStats {
pub total_allocations: usize,
pub successful_allocations: usize,
pub failed_allocations: usize,
pub total_bytes_allocated: usize,
pub current_bytes_in_use: usize,
pub peak_bytes_in_use: usize,
pub avg_allocation_size: f64,
pub efficiency: f64,
}
impl AdvancedGpuMemoryManager {
pub fn new(strategy: AdvancedMemoryStrategy, available_memory: usize) -> Self {
let alignment = 256; let max_pool_size = match strategy {
AdvancedMemoryStrategy::Pool { pool_size } => pool_size / (1024 * 1024),
_ => 100, };
Self {
base_manager: GpuMemoryManager::new(alignment, max_pool_size),
strategy,
available_memory,
allocation_history: Vec::new(),
pressure_threshold: 0.85,
enable_defrag: true,
usage_stats: MemoryUsageStats::default(),
}
}
pub fn allocate(&mut self, size: usize) -> Result<GpuMemoryBlock> {
self.usage_stats.total_allocations += 1;
let memory_pressure = self.calculate_memory_pressure();
let result = match self.strategy {
AdvancedMemoryStrategy::Conservative => self.allocate_conservative(size),
AdvancedMemoryStrategy::Aggressive => self.allocate_aggressive(size),
AdvancedMemoryStrategy::Adaptive => self.allocate_adaptive(size, memory_pressure),
AdvancedMemoryStrategy::Streaming { chunk_size } => {
self.allocate_streaming(size, chunk_size)
}
AdvancedMemoryStrategy::Unified => self.allocate_unified(size),
AdvancedMemoryStrategy::Pool { .. } => self.base_manager.allocate(size),
};
let success = result.is_ok();
self.allocation_history.push(AllocationRecord {
size,
timestamp: Instant::now(),
duration: None,
success,
});
if success {
self.usage_stats.successful_allocations += 1;
self.usage_stats.total_bytes_allocated += size;
self.usage_stats.current_bytes_in_use += size;
self.usage_stats.peak_bytes_in_use = self
.usage_stats
.peak_bytes_in_use
.max(self.usage_stats.current_bytes_in_use);
} else {
self.usage_stats.failed_allocations += 1;
}
self.update_efficiency();
result
}
pub fn deallocate(&mut self, block: GpuMemoryBlock) -> Result<()> {
let size = block.size;
self.base_manager.deallocate(block)?;
self.usage_stats.current_bytes_in_use =
self.usage_stats.current_bytes_in_use.saturating_sub(size);
Ok(())
}
fn allocate_conservative(&mut self, size: usize) -> Result<GpuMemoryBlock> {
if self.usage_stats.current_bytes_in_use + size > self.available_memory {
self.compact_memory()?;
}
self.base_manager.allocate(size)
}
fn allocate_aggressive(&mut self, size: usize) -> Result<GpuMemoryBlock> {
self.base_manager.allocate(size)
}
fn allocate_adaptive(&mut self, size: usize, memory_pressure: f64) -> Result<GpuMemoryBlock> {
if memory_pressure > self.pressure_threshold {
self.allocate_conservative(size)
} else {
self.allocate_aggressive(size)
}
}
fn allocate_streaming(&mut self, size: usize, chunk_size: usize) -> Result<GpuMemoryBlock> {
let actual_size = size.min(chunk_size);
self.base_manager.allocate(actual_size)
}
fn allocate_unified(&mut self, size: usize) -> Result<GpuMemoryBlock> {
self.base_manager.allocate(size)
}
fn calculate_memory_pressure(&self) -> f64 {
if self.available_memory == 0 {
return 1.0;
}
self.usage_stats.current_bytes_in_use as f64 / self.available_memory as f64
}
fn compact_memory(&mut self) -> Result<()> {
self.base_manager.clear_pools()
}
fn update_efficiency(&mut self) {
if self.usage_stats.total_allocations > 0 {
self.usage_stats.efficiency = self.usage_stats.successful_allocations as f64
/ self.usage_stats.total_allocations as f64;
self.usage_stats.avg_allocation_size = self.usage_stats.total_bytes_allocated as f64
/ self.usage_stats.successful_allocations.max(1) as f64;
}
}
pub fn get_stats(&self) -> &MemoryUsageStats {
&self.usage_stats
}
pub fn strategy(&self) -> AdvancedMemoryStrategy {
self.strategy
}
pub fn set_strategy(&mut self, strategy: AdvancedMemoryStrategy) {
self.strategy = strategy;
}
pub fn pressure_threshold(&self) -> f64 {
self.pressure_threshold
}
pub fn set_pressure_threshold(&mut self, threshold: f64) {
self.pressure_threshold = threshold.clamp(0.0, 1.0);
}
pub fn is_defrag_enabled(&self) -> bool {
self.enable_defrag
}
pub fn set_defrag_enabled(&mut self, enabled: bool) {
self.enable_defrag = enabled;
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum PrecisionMode {
Full,
Single,
Half,
Mixed,
BFloat16,
TensorFloat32,
Auto,
}
impl Default for PrecisionMode {
fn default() -> Self {
PrecisionMode::Auto
}
}
impl fmt::Display for PrecisionMode {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
PrecisionMode::Full => write!(f, "Full (f64)"),
PrecisionMode::Single => write!(f, "Single (f32)"),
PrecisionMode::Half => write!(f, "Half (f16)"),
PrecisionMode::Mixed => write!(f, "Mixed (f16/f32)"),
PrecisionMode::BFloat16 => write!(f, "BFloat16"),
PrecisionMode::TensorFloat32 => write!(f, "TF32"),
PrecisionMode::Auto => write!(f, "Auto"),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TensorCoreConfig {
pub enabled: bool,
pub precision: PrecisionMode,
pub tile_size: (usize, usize, usize),
pub use_sparsity: bool,
pub sparsity_ratio: f64,
pub auto_scale: bool,
pub loss_scale: f64,
}
impl Default for TensorCoreConfig {
fn default() -> Self {
Self {
enabled: true,
precision: PrecisionMode::Auto,
tile_size: (16, 16, 16),
use_sparsity: false,
sparsity_ratio: 0.5,
auto_scale: true,
loss_scale: 1.0,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TensorCoreCapabilities {
pub available: bool,
pub supported_precisions: Vec<PrecisionMode>,
pub supported_tile_sizes: Vec<(usize, usize, usize)>,
pub supports_sparsity: bool,
pub peak_tops: Option<f64>,
pub architecture: String,
}
impl Default for TensorCoreCapabilities {
fn default() -> Self {
Self {
available: false,
supported_precisions: vec![PrecisionMode::Single],
supported_tile_sizes: vec![(16, 16, 16)],
supports_sparsity: false,
peak_tops: None,
architecture: "Unknown".to_string(),
}
}
}
pub fn detect_tensor_core_capabilities(device: &GpuDevice) -> TensorCoreCapabilities {
match device.backend {
GpuBackend::Cuda => {
TensorCoreCapabilities {
available: true,
supported_precisions: vec![
PrecisionMode::Half,
PrecisionMode::Mixed,
PrecisionMode::BFloat16,
PrecisionMode::TensorFloat32,
],
supported_tile_sizes: vec![(16, 16, 16), (32, 8, 16), (8, 32, 16)],
supports_sparsity: true, peak_tops: Some(312.0), architecture: "NVIDIA Tensor Cores".to_string(),
}
}
GpuBackend::Rocm => {
TensorCoreCapabilities {
available: true,
supported_precisions: vec![
PrecisionMode::Half,
PrecisionMode::Mixed,
PrecisionMode::BFloat16,
],
supported_tile_sizes: vec![(32, 32, 8), (16, 16, 16)],
supports_sparsity: false,
peak_tops: Some(383.0), architecture: "AMD Matrix Cores".to_string(),
}
}
GpuBackend::Metal => {
TensorCoreCapabilities {
available: true,
supported_precisions: vec![PrecisionMode::Half, PrecisionMode::Single],
supported_tile_sizes: vec![(16, 16, 16)],
supports_sparsity: false,
peak_tops: Some(15.8), architecture: "Apple Neural Engine".to_string(),
}
}
_ => TensorCoreCapabilities::default(),
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum AdvancedDeviceSelection {
First,
MostMemory,
HighestCompute,
Specific(u32),
Auto,
Fastest,
MostEfficient,
RoundRobin,
LoadBalanced,
MultiGpu {
max_gpus: usize,
},
}
impl Default for AdvancedDeviceSelection {
fn default() -> Self {
AdvancedDeviceSelection::Auto
}
}
impl From<AdvancedDeviceSelection> for DeviceSelection {
fn from(adv: AdvancedDeviceSelection) -> Self {
match adv {
AdvancedDeviceSelection::First => DeviceSelection::First,
AdvancedDeviceSelection::MostMemory => DeviceSelection::MostMemory,
AdvancedDeviceSelection::HighestCompute => DeviceSelection::HighestCompute,
AdvancedDeviceSelection::Specific(id) => DeviceSelection::Specific(id),
AdvancedDeviceSelection::Auto => DeviceSelection::Auto,
AdvancedDeviceSelection::Fastest => DeviceSelection::Fastest,
_ => DeviceSelection::Auto, }
}
}
#[derive(Debug)]
pub struct DeviceSelector {
devices: Vec<GpuDevice>,
strategy: AdvancedDeviceSelection,
utilization: HashMap<u32, f64>,
round_robin_idx: usize,
benchmarks: HashMap<u32, DeviceBenchmark>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DeviceBenchmark {
pub device_id: u32,
pub distance_throughput: f64,
pub kmeans_time_ms: f64,
pub memory_bandwidth: f64,
pub power_consumption: Option<f64>,
pub timestamp: std::time::SystemTime,
}
impl DeviceSelector {
pub fn new(strategy: AdvancedDeviceSelection) -> Self {
Self {
devices: Vec::new(),
strategy,
utilization: HashMap::new(),
round_robin_idx: 0,
benchmarks: HashMap::new(),
}
}
pub fn add_device(&mut self, device: GpuDevice) {
self.utilization.insert(device.device_id, 0.0);
self.devices.push(device);
}
pub fn select_device(&mut self) -> Option<&GpuDevice> {
if self.devices.is_empty() {
return None;
}
match &self.strategy {
AdvancedDeviceSelection::First => self.devices.first(),
AdvancedDeviceSelection::MostMemory => {
self.devices.iter().max_by_key(|d| d.available_memory)
}
AdvancedDeviceSelection::HighestCompute => {
self.devices.iter().max_by_key(|d| d.compute_units)
}
AdvancedDeviceSelection::Specific(id) => {
self.devices.iter().find(|d| d.device_id == *id)
}
AdvancedDeviceSelection::Auto => {
self.devices.iter().max_by(|a, b| {
a.get_device_score()
.partial_cmp(&b.get_device_score())
.unwrap_or(std::cmp::Ordering::Equal)
})
}
AdvancedDeviceSelection::Fastest => {
if self.benchmarks.is_empty() {
self.devices.first()
} else {
let fastest_id = self
.benchmarks
.iter()
.min_by(|a, b| {
a.1.kmeans_time_ms
.partial_cmp(&b.1.kmeans_time_ms)
.unwrap_or(std::cmp::Ordering::Equal)
})
.map(|(id, _)| *id);
fastest_id.and_then(|id| self.devices.iter().find(|d| d.device_id == id))
}
}
AdvancedDeviceSelection::MostEfficient => {
if self.benchmarks.is_empty() {
self.devices.first()
} else {
let most_efficient_id = self
.benchmarks
.iter()
.filter_map(|(id, bench)| {
bench.power_consumption.map(|power| {
let efficiency = bench.distance_throughput / power;
(*id, efficiency)
})
})
.max_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal))
.map(|(id, _)| id);
most_efficient_id.and_then(|id| self.devices.iter().find(|d| d.device_id == id))
}
}
AdvancedDeviceSelection::RoundRobin => {
let idx = self.round_robin_idx % self.devices.len();
self.round_robin_idx += 1;
self.devices.get(idx)
}
AdvancedDeviceSelection::LoadBalanced => {
let least_utilized = self
.utilization
.iter()
.min_by(|a, b| a.1.partial_cmp(b.1).unwrap_or(std::cmp::Ordering::Equal))
.map(|(id, _)| *id);
least_utilized.and_then(|id| self.devices.iter().find(|d| d.device_id == id))
}
AdvancedDeviceSelection::MultiGpu { max_gpus } => {
self.devices.iter().take(*max_gpus).next()
}
}
}
pub fn update_utilization(&mut self, device_id: u32, utilization: f64) {
self.utilization
.insert(device_id, utilization.clamp(0.0, 1.0));
}
pub fn add_benchmark(&mut self, benchmark: DeviceBenchmark) {
self.benchmarks.insert(benchmark.device_id, benchmark);
}
pub fn devices(&self) -> &[GpuDevice] {
&self.devices
}
pub fn strategy(&self) -> &AdvancedDeviceSelection {
&self.strategy
}
pub fn set_strategy(&mut self, strategy: AdvancedDeviceSelection) {
self.strategy = strategy;
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GpuAccelerationConfig {
pub enabled: bool,
pub backend: GpuBackend,
pub device_selection: AdvancedDeviceSelection,
pub memory_strategy: AdvancedMemoryStrategy,
pub tensor_cores: TensorCoreConfig,
pub auto_fallback: bool,
pub min_problem_size: usize,
pub tile_size: usize,
pub async_execution: bool,
pub num_streams: usize,
pub enable_profiling: bool,
pub kernel_optimizations: KernelOptimizations,
}
impl Default for GpuAccelerationConfig {
fn default() -> Self {
Self {
enabled: true,
backend: GpuBackend::CpuFallback,
device_selection: AdvancedDeviceSelection::Auto,
memory_strategy: AdvancedMemoryStrategy::Adaptive,
tensor_cores: TensorCoreConfig::default(),
auto_fallback: true,
min_problem_size: 1000,
tile_size: 256,
async_execution: true,
num_streams: 4,
enable_profiling: false,
kernel_optimizations: KernelOptimizations::default(),
}
}
}
impl GpuAccelerationConfig {
pub fn cuda() -> Self {
Self {
backend: GpuBackend::Cuda,
tensor_cores: TensorCoreConfig {
enabled: true,
precision: PrecisionMode::Mixed,
..Default::default()
},
..Default::default()
}
}
pub fn opencl() -> Self {
Self {
backend: GpuBackend::OpenCl,
tensor_cores: TensorCoreConfig {
enabled: false,
precision: PrecisionMode::Single,
..Default::default()
},
..Default::default()
}
}
pub fn rocm() -> Self {
Self {
backend: GpuBackend::Rocm,
tensor_cores: TensorCoreConfig {
enabled: true,
precision: PrecisionMode::Mixed,
..Default::default()
},
..Default::default()
}
}
pub fn metal() -> Self {
Self {
backend: GpuBackend::Metal,
tensor_cores: TensorCoreConfig {
enabled: true,
precision: PrecisionMode::Half,
..Default::default()
},
..Default::default()
}
}
pub fn cpu() -> Self {
Self {
enabled: false,
backend: GpuBackend::CpuFallback,
..Default::default()
}
}
pub fn to_basic_config(&self) -> GpuConfig {
GpuConfig {
preferred_backend: self.backend,
device_selection: self.device_selection.clone().into(),
auto_fallback: self.auto_fallback,
memory_pool_size: match self.memory_strategy {
AdvancedMemoryStrategy::Pool { pool_size } => Some(pool_size),
_ => None,
},
optimize_memory: true,
backend_options: HashMap::new(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct KernelOptimizations {
pub loop_unrolling: bool,
pub shared_memory_tiling: bool,
pub register_blocking: bool,
pub vectorized_loads: bool,
pub texture_memory: bool,
pub constant_memory: bool,
pub occupancy_level: u8,
}
impl Default for KernelOptimizations {
fn default() -> Self {
Self {
loop_unrolling: true,
shared_memory_tiling: true,
register_blocking: true,
vectorized_loads: true,
texture_memory: false,
constant_memory: true,
occupancy_level: 2,
}
}
}
#[derive(Debug)]
pub struct GpuKMeans<F: Float> {
config: GpuAccelerationConfig,
context: Option<GpuContext>,
memory_manager: AdvancedGpuMemoryManager,
device_selector: DeviceSelector,
tensor_caps: TensorCoreCapabilities,
gpu_available: bool,
profiling_data: Vec<ProfilingRecord>,
_phantom: std::marker::PhantomData<F>,
}
#[derive(Debug, Clone)]
pub struct ProfilingRecord {
pub operation: String,
pub duration_us: u64,
pub memory_transferred: usize,
pub compute_ops: usize,
pub timestamp: Instant,
}
#[derive(Debug, Clone)]
pub struct GpuKMeansResult<F: Float> {
pub centroids: Array2<F>,
pub labels: Array1<usize>,
pub inertia: F,
pub n_iterations: usize,
pub converged: bool,
pub metrics: KMeansMetrics,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct KMeansMetrics {
pub total_time_ms: f64,
pub distance_time_ms: f64,
pub centroid_update_time_ms: f64,
pub label_assignment_time_ms: f64,
pub transfer_time_ms: f64,
pub used_gpu: bool,
pub backend: String,
pub memory_used: usize,
pub throughput: f64,
}
impl<F: Float + FromPrimitive + Send + Sync + 'static> GpuKMeans<F> {
pub fn new(config: GpuAccelerationConfig) -> Result<Self> {
let device_selector = DeviceSelector::new(config.device_selection.clone());
let (context, gpu_available, tensor_caps) = Self::try_create_context(&config)?;
let available_memory = context
.as_ref()
.map(|ctx| ctx.device.available_memory)
.unwrap_or(1024 * 1024 * 1024);
let memory_manager =
AdvancedGpuMemoryManager::new(config.memory_strategy, available_memory);
Ok(Self {
config,
context,
memory_manager,
device_selector,
tensor_caps,
gpu_available,
profiling_data: Vec::new(),
_phantom: std::marker::PhantomData,
})
}
fn try_create_context(
config: &GpuAccelerationConfig,
) -> Result<(Option<GpuContext>, bool, TensorCoreCapabilities)> {
if !config.enabled || config.backend == GpuBackend::CpuFallback {
return Ok((None, false, TensorCoreCapabilities::default()));
}
let device = GpuDevice::new(
0,
format!("{} Device", config.backend),
8_000_000_000,
6_000_000_000,
"1.0".to_string(),
1024,
config.backend,
true,
);
let tensor_caps = detect_tensor_core_capabilities(&device);
let basic_config = config.to_basic_config();
match GpuContext::new(device.clone(), basic_config) {
Ok(ctx) => Ok((Some(ctx), true, tensor_caps)),
Err(_) if config.auto_fallback => Ok((None, false, TensorCoreCapabilities::default())),
Err(e) => Err(e),
}
}
pub fn fit(
&mut self,
data: ArrayView2<F>,
k: usize,
max_iter: usize,
tol: F,
) -> Result<GpuKMeansResult<F>> {
let start_time = Instant::now();
let n_samples = data.nrows();
let n_features = data.ncols();
if k == 0 || k > n_samples {
return Err(ClusteringError::InvalidInput(format!(
"k must be between 1 and n_samples ({}), got {}",
n_samples, k
)));
}
let use_gpu = self.should_use_gpu(n_samples, n_features);
if use_gpu && self.gpu_available {
self.fit_gpu(data, k, max_iter, tol, start_time)
} else {
self.fit_cpu(data, k, max_iter, tol, start_time)
}
}
fn should_use_gpu(&self, n_samples: usize, n_features: usize) -> bool {
let problem_size = n_samples * n_features;
problem_size >= self.config.min_problem_size && self.config.enabled
}
fn fit_gpu(
&mut self,
data: ArrayView2<F>,
k: usize,
max_iter: usize,
tol: F,
start_time: Instant,
) -> Result<GpuKMeansResult<F>> {
let n_samples = data.nrows();
let n_features = data.ncols();
let mut centroids = self.initialize_centroids_gpu(data, k)?;
let mut labels = Array1::zeros(n_samples);
let mut inertia = F::infinity();
let mut converged = false;
let mut n_iterations = 0;
let mut distance_time = Duration::ZERO;
let mut centroid_time = Duration::ZERO;
let mut label_time = Duration::ZERO;
for iter in 0..max_iter {
n_iterations = iter + 1;
let label_start = Instant::now();
let (new_labels, distances) = self.compute_labels_gpu(data, centroids.view())?;
labels = new_labels;
label_time += label_start.elapsed();
let centroid_start = Instant::now();
let new_centroids = self.compute_centroids_gpu(data, &labels, k)?;
centroid_time += centroid_start.elapsed();
let new_inertia = self.compute_inertia(&distances);
let centroid_shift =
self.compute_centroid_shift(centroids.view(), new_centroids.view());
centroids = new_centroids;
if centroid_shift <= tol
|| (inertia - new_inertia).abs() < tol * F::from(0.01).unwrap_or(tol)
{
converged = true;
inertia = new_inertia;
break;
}
inertia = new_inertia;
}
let total_time = start_time.elapsed();
let metrics = KMeansMetrics {
total_time_ms: total_time.as_secs_f64() * 1000.0,
distance_time_ms: distance_time.as_secs_f64() * 1000.0,
centroid_update_time_ms: centroid_time.as_secs_f64() * 1000.0,
label_assignment_time_ms: label_time.as_secs_f64() * 1000.0,
transfer_time_ms: 0.0, used_gpu: true,
backend: format!("{}", self.config.backend),
memory_used: self.memory_manager.get_stats().current_bytes_in_use,
throughput: n_samples as f64 / total_time.as_secs_f64(),
};
Ok(GpuKMeansResult {
centroids,
labels,
inertia,
n_iterations,
converged,
metrics,
})
}
fn fit_cpu(
&self,
data: ArrayView2<F>,
k: usize,
max_iter: usize,
tol: F,
start_time: Instant,
) -> Result<GpuKMeansResult<F>> {
let n_samples = data.nrows();
let n_features = data.ncols();
let mut centroids = self.initialize_centroids_cpu(data, k)?;
let mut labels = Array1::zeros(n_samples);
let mut inertia = F::infinity();
let mut converged = false;
let mut n_iterations = 0;
for iter in 0..max_iter {
n_iterations = iter + 1;
let (new_labels, distances) = self.assign_labels_cpu(data, centroids.view())?;
labels = new_labels;
let new_centroids = self.update_centroids_cpu(data, &labels, k, n_features)?;
let new_inertia = self.compute_inertia(&distances);
let centroid_shift =
self.compute_centroid_shift(centroids.view(), new_centroids.view());
centroids = new_centroids;
if centroid_shift <= tol {
converged = true;
inertia = new_inertia;
break;
}
inertia = new_inertia;
}
let total_time = start_time.elapsed();
let metrics = KMeansMetrics {
total_time_ms: total_time.as_secs_f64() * 1000.0,
distance_time_ms: 0.0,
centroid_update_time_ms: 0.0,
label_assignment_time_ms: 0.0,
transfer_time_ms: 0.0,
used_gpu: false,
backend: "CPU".to_string(),
memory_used: 0,
throughput: n_samples as f64 / total_time.as_secs_f64(),
};
Ok(GpuKMeansResult {
centroids,
labels,
inertia,
n_iterations,
converged,
metrics,
})
}
fn initialize_centroids_gpu(&self, data: ArrayView2<F>, k: usize) -> Result<Array2<F>> {
self.initialize_centroids_cpu(data, k)
}
fn initialize_centroids_cpu(&self, data: ArrayView2<F>, k: usize) -> Result<Array2<F>> {
let n_samples = data.nrows();
let n_features = data.ncols();
let mut centroids = Array2::zeros((k, n_features));
let mut rng = scirs2_core::random::rng();
let first_idx = scirs2_core::random::RngExt::random_range(&mut rng, 0..n_samples);
for j in 0..n_features {
centroids[[0, j]] = data[[first_idx, j]];
}
if k == 1 {
return Ok(centroids);
}
let mut min_distances = Array1::from_elem(n_samples, F::infinity());
for i in 1..k {
for sample_idx in 0..n_samples {
let dist =
self.euclidean_distance_squared(data.row(sample_idx), centroids.row(i - 1));
if dist < min_distances[sample_idx] {
min_distances[sample_idx] = dist;
}
}
let sum_distances: F = min_distances.iter().copied().fold(F::zero(), |a, b| a + b);
if sum_distances <= F::zero() {
let idx = scirs2_core::random::RngExt::random_range(&mut rng, 0..n_samples);
for j in 0..n_features {
centroids[[i, j]] = data[[idx, j]];
}
continue;
}
let threshold = F::from(scirs2_core::random::RngExt::random_range(
&mut rng,
0.0..1.0,
))
.unwrap_or(F::zero())
* sum_distances;
let mut cumsum = F::zero();
let mut next_idx = 0;
for (idx, &dist) in min_distances.iter().enumerate() {
cumsum = cumsum + dist;
if cumsum >= threshold {
next_idx = idx;
break;
}
}
for j in 0..n_features {
centroids[[i, j]] = data[[next_idx, j]];
}
}
Ok(centroids)
}
fn compute_labels_gpu(
&self,
data: ArrayView2<F>,
centroids: ArrayView2<F>,
) -> Result<(Array1<usize>, Array1<F>)> {
self.assign_labels_cpu(data, centroids)
}
fn compute_centroids_gpu(
&self,
data: ArrayView2<F>,
labels: &Array1<usize>,
k: usize,
) -> Result<Array2<F>> {
let n_features = data.ncols();
self.update_centroids_cpu(data, labels, k, n_features)
}
fn assign_labels_cpu(
&self,
data: ArrayView2<F>,
centroids: ArrayView2<F>,
) -> Result<(Array1<usize>, Array1<F>)> {
let n_samples = data.nrows();
let n_centroids = centroids.nrows();
let mut labels = Array1::zeros(n_samples);
let mut distances = Array1::zeros(n_samples);
for i in 0..n_samples {
let mut min_dist = F::infinity();
let mut min_label = 0;
for j in 0..n_centroids {
let dist = self.euclidean_distance_squared(data.row(i), centroids.row(j));
if dist < min_dist {
min_dist = dist;
min_label = j;
}
}
labels[i] = min_label;
distances[i] = min_dist;
}
Ok((labels, distances))
}
fn update_centroids_cpu(
&self,
data: ArrayView2<F>,
labels: &Array1<usize>,
k: usize,
n_features: usize,
) -> Result<Array2<F>> {
let mut centroids = Array2::zeros((k, n_features));
let mut counts = vec![0usize; k];
for (i, &label) in labels.iter().enumerate() {
if label < k {
for j in 0..n_features {
centroids[[label, j]] = centroids[[label, j]] + data[[i, j]];
}
counts[label] += 1;
}
}
for i in 0..k {
if counts[i] > 0 {
let count = F::from(counts[i]).unwrap_or(F::one());
for j in 0..n_features {
centroids[[i, j]] = centroids[[i, j]] / count;
}
}
}
Ok(centroids)
}
fn euclidean_distance_squared(
&self,
a: scirs2_core::ndarray::ArrayView1<F>,
b: scirs2_core::ndarray::ArrayView1<F>,
) -> F {
a.iter()
.zip(b.iter())
.map(|(&x, &y)| {
let diff = x - y;
diff * diff
})
.fold(F::zero(), |acc, x| acc + x)
}
fn compute_inertia(&self, distances: &Array1<F>) -> F {
distances.iter().copied().fold(F::zero(), |a, b| a + b)
}
fn compute_centroid_shift(&self, old: ArrayView2<F>, new: ArrayView2<F>) -> F {
let mut max_shift = F::zero();
for i in 0..old.nrows() {
let shift = self
.euclidean_distance_squared(old.row(i), new.row(i))
.sqrt();
if shift > max_shift {
max_shift = shift;
}
}
max_shift
}
pub fn config(&self) -> &GpuAccelerationConfig {
&self.config
}
pub fn is_gpu_available(&self) -> bool {
self.gpu_available
}
pub fn tensor_core_capabilities(&self) -> &TensorCoreCapabilities {
&self.tensor_caps
}
pub fn memory_stats(&self) -> &MemoryUsageStats {
self.memory_manager.get_stats()
}
pub fn profiling_data(&self) -> &[ProfilingRecord] {
&self.profiling_data
}
}
#[cfg(test)]
mod tests {
use super::*;
use scirs2_core::ndarray::Array2;
#[test]
fn test_advanced_memory_strategy_display() {
assert_eq!(
AdvancedMemoryStrategy::Conservative.to_string(),
"Conservative"
);
assert_eq!(
AdvancedMemoryStrategy::Streaming {
chunk_size: 1024 * 1024
}
.to_string(),
"Streaming(1MB)"
);
}
#[test]
fn test_advanced_memory_manager_creation() {
let manager = AdvancedGpuMemoryManager::new(
AdvancedMemoryStrategy::Adaptive,
4 * 1024 * 1024 * 1024, );
assert_eq!(manager.strategy(), AdvancedMemoryStrategy::Adaptive);
}
#[test]
fn test_advanced_memory_allocation() {
let mut manager = AdvancedGpuMemoryManager::new(
AdvancedMemoryStrategy::Conservative,
1024 * 1024 * 1024, );
let result = manager.allocate(1024);
assert!(result.is_ok());
let stats = manager.get_stats();
assert_eq!(stats.total_allocations, 1);
assert_eq!(stats.successful_allocations, 1);
}
#[test]
fn test_precision_mode_display() {
assert_eq!(PrecisionMode::Mixed.to_string(), "Mixed (f16/f32)");
assert_eq!(PrecisionMode::TensorFloat32.to_string(), "TF32");
}
#[test]
fn test_tensor_core_config_default() {
let config = TensorCoreConfig::default();
assert!(config.enabled);
assert_eq!(config.precision, PrecisionMode::Auto);
assert!(config.auto_scale);
}
#[test]
fn test_device_selector_creation() {
let selector = DeviceSelector::new(AdvancedDeviceSelection::Auto);
assert!(selector.devices().is_empty());
}
#[test]
fn test_device_selector_add_device() {
let mut selector = DeviceSelector::new(AdvancedDeviceSelection::MostMemory);
let device = GpuDevice::new(
0,
"Test GPU".to_string(),
8_000_000_000,
6_000_000_000,
"1.0".to_string(),
1024,
GpuBackend::Cuda,
true,
);
selector.add_device(device);
assert_eq!(selector.devices().len(), 1);
}
#[test]
fn test_gpu_acceleration_config_default() {
let config = GpuAccelerationConfig::default();
assert!(config.enabled);
assert!(config.auto_fallback);
}
#[test]
fn test_gpu_acceleration_config_cuda() {
let config = GpuAccelerationConfig::cuda();
assert_eq!(config.backend, GpuBackend::Cuda);
assert!(config.tensor_cores.enabled);
}
#[test]
fn test_gpu_kmeans_creation() {
let config = GpuAccelerationConfig::cpu();
let kmeans = GpuKMeans::<f64>::new(config);
assert!(kmeans.is_ok());
}
#[test]
fn test_gpu_kmeans_fit_cpu_fallback() {
let config = GpuAccelerationConfig::cpu();
let mut kmeans = GpuKMeans::<f64>::new(config).expect("Failed to create GpuKMeans");
let data = Array2::from_shape_vec(
(6, 2),
vec![1.0, 2.0, 1.2, 1.8, 0.8, 1.9, 4.0, 5.0, 4.2, 4.8, 3.9, 5.1],
)
.expect("Failed to create test data");
let result = kmeans.fit(data.view(), 2, 100, 1e-4);
assert!(result.is_ok());
let result = result.expect("Failed to fit");
assert_eq!(result.centroids.nrows(), 2);
assert_eq!(result.labels.len(), 6);
assert!(!result.metrics.used_gpu);
}
#[test]
fn test_gpu_kmeans_convergence() {
let config = GpuAccelerationConfig::cpu();
let mut kmeans = GpuKMeans::<f64>::new(config).expect("Failed to create GpuKMeans");
let data = Array2::from_shape_vec(
(8, 2),
vec![
0.0, 0.0, 0.1, 0.1, 0.0, 0.1, 0.1, 0.0, 10.0, 10.0, 10.1, 10.1, 10.0, 10.1, 10.1,
10.0,
],
)
.expect("Failed to create test data");
let result = kmeans.fit(data.view(), 2, 100, 1e-6);
assert!(result.is_ok());
let result = result.expect("Failed to fit");
assert!(result.converged);
assert!(result.n_iterations < 50);
}
#[test]
fn test_memory_usage_stats() {
let mut manager =
AdvancedGpuMemoryManager::new(AdvancedMemoryStrategy::Aggressive, 1024 * 1024 * 1024);
for _ in 0..5 {
let _ = manager.allocate(1024);
}
let stats = manager.get_stats();
assert_eq!(stats.total_allocations, 5);
assert!(stats.efficiency > 0.0);
}
#[test]
fn test_kernel_optimizations_default() {
let opts = KernelOptimizations::default();
assert!(opts.loop_unrolling);
assert!(opts.shared_memory_tiling);
assert_eq!(opts.occupancy_level, 2);
}
#[test]
fn test_detect_tensor_core_capabilities() {
let cuda_device = GpuDevice::new(
0,
"CUDA Device".to_string(),
8_000_000_000,
6_000_000_000,
"8.0".to_string(),
1024,
GpuBackend::Cuda,
true,
);
let caps = detect_tensor_core_capabilities(&cuda_device);
assert!(caps.available);
assert!(!caps.supported_precisions.is_empty());
}
#[test]
fn test_profiling_record_creation() {
let record = ProfilingRecord {
operation: "distance_compute".to_string(),
duration_us: 1000,
memory_transferred: 1024 * 1024,
compute_ops: 1000000,
timestamp: Instant::now(),
};
assert_eq!(record.operation, "distance_compute");
assert_eq!(record.duration_us, 1000);
}
}