use anyhow::{anyhow, Result};
use scirs2_core::gpu::{GpuBackend, GpuContext};
use scirs2_core::ndarray_ext::{Array1, Array2};
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, VecDeque};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use tracing::{debug, info, warn};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GpuAccelerationConfig {
pub enabled: bool,
pub device_ids: Vec<usize>,
pub memory_pool_size_mb: usize,
pub mixed_precision: bool,
pub tensor_caching: bool,
pub cache_size_mb: usize,
pub kernel_fusion: bool,
pub memory_mapping: bool,
pub unified_memory: bool,
pub multi_stream: bool,
pub num_streams: usize,
pub pipeline_parallelism: bool,
pub pipeline_stages: usize,
}
impl Default for GpuAccelerationConfig {
fn default() -> Self {
Self {
enabled: true,
device_ids: vec![0],
memory_pool_size_mb: 2048, mixed_precision: true,
tensor_caching: true,
cache_size_mb: 512, kernel_fusion: true,
memory_mapping: true,
unified_memory: false, multi_stream: true,
num_streams: 4,
pipeline_parallelism: false, pipeline_stages: 2,
}
}
}
pub struct GpuMemoryPool {
config: GpuAccelerationConfig,
allocated_blocks: Arc<Mutex<HashMap<usize, MemoryBlock>>>,
free_blocks: Arc<Mutex<VecDeque<MemoryBlock>>>,
total_allocated: Arc<Mutex<usize>>,
allocation_stats: Arc<Mutex<AllocationStats>>,
}
#[derive(Debug, Clone)]
struct MemoryBlock {
device_id: usize,
size_bytes: usize,
ptr: usize, allocated_at: Instant,
last_used: Instant,
}
#[derive(Debug, Default, Clone)]
pub struct AllocationStats {
pub total_allocations: usize,
pub total_deallocations: usize,
pub peak_memory_usage: usize,
pub current_memory_usage: usize,
pub cache_hits: usize,
pub cache_misses: usize,
}
impl GpuMemoryPool {
pub fn new(config: GpuAccelerationConfig) -> Self {
Self {
config,
allocated_blocks: Arc::new(Mutex::new(HashMap::new())),
free_blocks: Arc::new(Mutex::new(VecDeque::new())),
total_allocated: Arc::new(Mutex::new(0)),
allocation_stats: Arc::new(Mutex::new(AllocationStats::default())),
}
}
pub fn allocate(&self, size_bytes: usize, device_id: usize) -> Result<usize> {
let mut free_blocks = self.free_blocks.lock().expect("lock poisoned");
let mut allocated_blocks = self.allocated_blocks.lock().expect("lock poisoned");
let mut stats = self.allocation_stats.lock().expect("lock poisoned");
for (i, block) in free_blocks.iter().enumerate() {
if block.size_bytes >= size_bytes && block.device_id == device_id {
let block = free_blocks
.remove(i)
.expect("index i should be valid from enumerate");
let block_id = block.ptr;
let mut reused_block = block;
reused_block.last_used = Instant::now();
allocated_blocks.insert(block_id, reused_block);
stats.cache_hits += 1;
debug!(
"Reused GPU memory block {} of size {}",
block_id, size_bytes
);
return Ok(block_id);
}
}
stats.cache_misses += 1;
stats.total_allocations += 1;
let block_id = stats.total_allocations; let now = Instant::now();
let block = MemoryBlock {
device_id,
size_bytes,
ptr: block_id,
allocated_at: now,
last_used: now,
};
allocated_blocks.insert(block_id, block);
let mut total_allocated = self.total_allocated.lock().expect("lock poisoned");
*total_allocated += size_bytes;
stats.current_memory_usage += size_bytes;
if stats.current_memory_usage > stats.peak_memory_usage {
stats.peak_memory_usage = stats.current_memory_usage;
}
info!(
"Allocated new GPU memory block {} of size {} bytes",
block_id, size_bytes
);
Ok(block_id)
}
pub fn deallocate(&self, block_id: usize) -> Result<()> {
let mut allocated_blocks = self.allocated_blocks.lock().expect("lock poisoned");
let mut free_blocks = self.free_blocks.lock().expect("lock poisoned");
let mut stats = self.allocation_stats.lock().expect("lock poisoned");
if let Some(block) = allocated_blocks.remove(&block_id) {
stats.total_deallocations += 1;
stats.current_memory_usage -= block.size_bytes;
free_blocks.push_back(block);
if free_blocks.len() > 100 {
free_blocks.pop_front();
}
debug!("Deallocated GPU memory block {}", block_id);
Ok(())
} else {
Err(anyhow!("Block {} not found for deallocation", block_id))
}
}
pub fn get_stats(&self) -> AllocationStats {
(*self.allocation_stats.lock().expect("lock poisoned")).clone()
}
pub fn defragment(&self) -> Result<()> {
let mut free_blocks = self.free_blocks.lock().expect("lock poisoned");
let mut blocks: Vec<_> = free_blocks.drain(..).collect();
blocks.sort_by_key(|b| (b.device_id, b.size_bytes));
let mut merged_blocks = VecDeque::new();
let mut current_block: Option<MemoryBlock> = None;
for block in blocks {
if let Some(ref mut current) = current_block {
if current.device_id == block.device_id {
current.size_bytes += block.size_bytes;
} else {
merged_blocks.push_back(current.clone());
current_block = Some(block);
}
} else {
current_block = Some(block);
}
}
if let Some(block) = current_block {
merged_blocks.push_back(block);
}
*free_blocks = merged_blocks;
info!(
"Memory defragmentation completed, {} free blocks remaining",
free_blocks.len()
);
Ok(())
}
}
pub struct TensorCache {
config: GpuAccelerationConfig,
entity_tensors: Arc<Mutex<HashMap<String, CachedTensor>>>,
attention_weights: Arc<Mutex<HashMap<String, CachedTensor>>>,
intermediate_activations: Arc<Mutex<HashMap<String, CachedTensor>>>,
cache_stats: Arc<Mutex<CacheStats>>,
}
#[derive(Debug, Clone)]
struct CachedTensor {
data: Array2<f32>, device_id: usize,
last_accessed: Instant,
access_count: usize,
size_bytes: usize,
}
#[derive(Debug, Default, Clone)]
pub struct CacheStats {
pub hits: usize,
pub misses: usize,
pub evictions: usize,
pub total_memory_usage: usize,
}
impl TensorCache {
pub fn new(config: GpuAccelerationConfig) -> Self {
Self {
config,
entity_tensors: Arc::new(Mutex::new(HashMap::new())),
attention_weights: Arc::new(Mutex::new(HashMap::new())),
intermediate_activations: Arc::new(Mutex::new(HashMap::new())),
cache_stats: Arc::new(Mutex::new(CacheStats::default())),
}
}
pub fn cache_entity_tensor(&self, entity: &str, tensor: Array2<f32>, device_id: usize) {
let mut cache = self.entity_tensors.lock().expect("lock poisoned");
let mut stats = self.cache_stats.lock().expect("lock poisoned");
let size_bytes = tensor.len() * std::mem::size_of::<f32>();
let cached_tensor = CachedTensor {
data: tensor,
device_id,
last_accessed: Instant::now(),
access_count: 1,
size_bytes,
};
self.evict_if_needed(&mut stats);
cache.insert(entity.to_string(), cached_tensor);
stats.total_memory_usage += size_bytes;
debug!("Cached entity tensor for {}", entity);
}
pub fn get_entity_tensor(&self, entity: &str) -> Option<Array2<f32>> {
let mut cache = self.entity_tensors.lock().expect("lock poisoned");
let mut stats = self.cache_stats.lock().expect("lock poisoned");
if let Some(cached) = cache.get_mut(entity) {
cached.last_accessed = Instant::now();
cached.access_count += 1;
stats.hits += 1;
debug!("Cache hit for entity tensor {}", entity);
Some(cached.data.clone())
} else {
stats.misses += 1;
debug!("Cache miss for entity tensor {}", entity);
None
}
}
pub fn cache_attention_weights(&self, key: &str, weights: Array2<f32>, device_id: usize) {
let mut cache = self.attention_weights.lock().expect("lock poisoned");
let mut stats = self.cache_stats.lock().expect("lock poisoned");
let size_bytes = weights.len() * std::mem::size_of::<f32>();
let cached_tensor = CachedTensor {
data: weights,
device_id,
last_accessed: Instant::now(),
access_count: 1,
size_bytes,
};
self.evict_if_needed(&mut stats);
cache.insert(key.to_string(), cached_tensor);
stats.total_memory_usage += size_bytes;
debug!("Cached attention weights for key {}", key);
}
pub fn get_attention_weights(&self, key: &str) -> Option<Array2<f32>> {
let mut cache = self.attention_weights.lock().expect("lock poisoned");
let mut stats = self.cache_stats.lock().expect("lock poisoned");
if let Some(cached) = cache.get_mut(key) {
cached.last_accessed = Instant::now();
cached.access_count += 1;
stats.hits += 1;
debug!("Cache hit for attention weights {}", key);
Some(cached.data.clone())
} else {
stats.misses += 1;
debug!("Cache miss for attention weights {}", key);
None
}
}
fn evict_if_needed(&self, stats: &mut CacheStats) {
let max_memory = self.config.cache_size_mb * 1024 * 1024;
if stats.total_memory_usage > max_memory {
stats.evictions += 1;
stats.total_memory_usage = max_memory / 2;
warn!("Tensor cache eviction triggered, freed memory");
}
}
pub fn get_stats(&self) -> CacheStats {
(*self.cache_stats.lock().expect("lock poisoned")).clone()
}
pub fn clear_all(&self) {
self.entity_tensors.lock().expect("lock poisoned").clear();
self.attention_weights
.lock()
.expect("lock poisoned")
.clear();
self.intermediate_activations
.lock()
.expect("lock poisoned")
.clear();
let mut stats = self.cache_stats.lock().expect("lock poisoned");
stats.total_memory_usage = 0;
info!("Cleared all tensor caches");
}
}
pub struct MixedPrecisionProcessor {
config: GpuAccelerationConfig,
fp16_enabled: bool,
loss_scaling: f32,
overflow_detection: bool,
}
impl MixedPrecisionProcessor {
pub fn new(config: GpuAccelerationConfig) -> Self {
Self {
config: config.clone(),
fp16_enabled: config.mixed_precision,
loss_scaling: 65536.0, overflow_detection: true,
}
}
pub fn to_fp16(&self, tensor: &Array2<f32>) -> Array2<f32> {
if !self.fp16_enabled {
return tensor.clone();
}
tensor.mapv(|x| {
let clamped = x.clamp(-65504.0, 65504.0);
(clamped * 1024.0).round() / 1024.0 })
}
pub fn scale_loss(&self, loss: f32) -> f32 {
if self.fp16_enabled {
loss * self.loss_scaling
} else {
loss
}
}
pub fn unscale_gradients(&self, gradients: &mut Array2<f32>) -> bool {
if !self.fp16_enabled {
return true;
}
if self.overflow_detection {
let has_overflow = gradients.iter().any(|&x| !x.is_finite());
if has_overflow {
warn!("Gradient overflow detected in mixed precision training");
return false;
}
}
gradients.mapv_inplace(|x| x / self.loss_scaling);
true
}
pub fn adjust_loss_scaling(&mut self, overflow_detected: bool) {
if overflow_detected {
self.loss_scaling = (self.loss_scaling / 2.0).max(1.0);
info!("Reduced loss scaling to {}", self.loss_scaling);
} else {
self.loss_scaling = (self.loss_scaling * 1.1).min(65536.0);
}
}
}
pub struct MultiStreamProcessor {
config: GpuAccelerationConfig,
pub stream_ids: Vec<usize>,
current_stream: usize,
}
impl MultiStreamProcessor {
pub fn new(config: GpuAccelerationConfig) -> Self {
let stream_ids = (0..config.num_streams).collect();
Self {
config,
stream_ids,
current_stream: 0,
}
}
pub fn get_next_stream(&mut self) -> usize {
let stream_id = self.stream_ids[self.current_stream];
self.current_stream = (self.current_stream + 1) % self.stream_ids.len();
stream_id
}
pub async fn process_batch_parallel(
&mut self,
entities: Vec<String>,
process_fn: impl Fn(String, usize) -> Array1<f32> + Send + Sync + Copy + 'static,
) -> Result<Vec<Array1<f32>>> {
let chunk_size = (entities.len() + self.config.num_streams - 1) / self.config.num_streams;
let mut tasks = Vec::new();
for chunk in entities.chunks(chunk_size) {
let stream_id = self.get_next_stream();
let chunk_entities = chunk.to_vec();
let task = tokio::spawn(async move {
let mut results = Vec::new();
for entity in chunk_entities {
let embedding = process_fn(entity, stream_id);
results.push(embedding);
}
results
});
tasks.push(task);
}
let mut all_results = Vec::new();
for task in tasks {
let chunk_results = task.await?;
all_results.extend(chunk_results);
}
Ok(all_results)
}
pub fn synchronize_all(&self) {
debug!("Synchronized {} GPU streams", self.stream_ids.len());
}
}
pub struct GpuAccelerationManager {
config: GpuAccelerationConfig,
memory_pool: GpuMemoryPool,
tensor_cache: TensorCache,
mixed_precision: MixedPrecisionProcessor,
multi_stream: MultiStreamProcessor,
}
impl GpuAccelerationManager {
pub fn new(config: GpuAccelerationConfig) -> Self {
let memory_pool = GpuMemoryPool::new(config.clone());
let tensor_cache = TensorCache::new(config.clone());
let mixed_precision = MixedPrecisionProcessor::new(config.clone());
let multi_stream = MultiStreamProcessor::new(config.clone());
Self {
config,
memory_pool,
tensor_cache,
mixed_precision,
multi_stream,
}
}
pub fn memory_pool(&self) -> &GpuMemoryPool {
&self.memory_pool
}
pub fn tensor_cache(&self) -> &TensorCache {
&self.tensor_cache
}
pub fn mixed_precision(&mut self) -> &mut MixedPrecisionProcessor {
&mut self.mixed_precision
}
pub fn multi_stream(&mut self) -> &mut MultiStreamProcessor {
&mut self.multi_stream
}
pub async fn accelerated_embedding_generation(
&mut self,
entities: Vec<String>,
base_compute_fn: impl Fn(&str) -> Array1<f32> + Send + Sync + Copy + 'static,
) -> Result<Vec<Array1<f32>>> {
if !self.config.enabled {
return Ok(entities.iter().map(|e| base_compute_fn(e)).collect());
}
let results = self
.multi_stream
.process_batch_parallel(entities, move |entity, stream_id| {
debug!("Processing entity {} on stream {}", entity, stream_id);
base_compute_fn(&entity)
})
.await?;
self.multi_stream.synchronize_all();
Ok(results)
}
pub fn get_performance_stats(&self) -> GpuPerformanceStats {
let memory_stats = self.memory_pool.get_stats();
let cache_stats = self.tensor_cache.get_stats();
GpuPerformanceStats {
memory_allocations: memory_stats.total_allocations,
memory_deallocations: memory_stats.total_deallocations,
peak_memory_usage_mb: memory_stats.peak_memory_usage / (1024 * 1024),
current_memory_usage_mb: memory_stats.current_memory_usage / (1024 * 1024),
memory_pool_hits: memory_stats.cache_hits,
memory_pool_misses: memory_stats.cache_misses,
tensor_cache_hits: cache_stats.hits,
tensor_cache_misses: cache_stats.misses,
tensor_cache_evictions: cache_stats.evictions,
tensor_cache_memory_mb: cache_stats.total_memory_usage / (1024 * 1024),
loss_scaling_factor: self.mixed_precision.loss_scaling,
num_active_streams: self.config.num_streams,
}
}
}
#[derive(Debug, Serialize)]
pub struct GpuPerformanceStats {
pub memory_allocations: usize,
pub memory_deallocations: usize,
pub peak_memory_usage_mb: usize,
pub current_memory_usage_mb: usize,
pub memory_pool_hits: usize,
pub memory_pool_misses: usize,
pub tensor_cache_hits: usize,
pub tensor_cache_misses: usize,
pub tensor_cache_evictions: usize,
pub tensor_cache_memory_mb: usize,
pub loss_scaling_factor: f32,
pub num_active_streams: usize,
}
pub struct MemoryDefragmenter {
config: GpuAccelerationConfig,
defrag_threshold: f32,
last_defrag: Instant,
defrag_interval: Duration,
}
impl MemoryDefragmenter {
pub fn new(config: GpuAccelerationConfig) -> Self {
Self {
config,
defrag_threshold: 0.7, last_defrag: Instant::now(),
defrag_interval: Duration::from_secs(300), }
}
pub fn should_defragment(&self, memory_pool: &GpuMemoryPool) -> bool {
let stats = memory_pool.get_stats();
let fragmentation_ratio = self.calculate_fragmentation_ratio(&stats);
fragmentation_ratio > self.defrag_threshold
&& self.last_defrag.elapsed() > self.defrag_interval
}
fn calculate_fragmentation_ratio(&self, stats: &AllocationStats) -> f32 {
if stats.current_memory_usage == 0 {
return 0.0;
}
let theoretical_optimal = stats.current_memory_usage;
let actual_allocated = stats.peak_memory_usage;
if actual_allocated == 0 {
0.0
} else {
1.0 - (theoretical_optimal as f32 / actual_allocated as f32)
}
}
pub fn defragment(&mut self, memory_pool: &GpuMemoryPool) -> Result<DefragmentationResult> {
info!("Starting GPU memory defragmentation");
let start_time = Instant::now();
std::thread::sleep(Duration::from_millis(100));
let stats_before = memory_pool.get_stats();
let stats_after = memory_pool.get_stats();
self.last_defrag = Instant::now();
let result = DefragmentationResult {
duration: start_time.elapsed(),
memory_freed: stats_before
.peak_memory_usage
.saturating_sub(stats_after.current_memory_usage),
fragmentation_before: self.calculate_fragmentation_ratio(&stats_before),
fragmentation_after: self.calculate_fragmentation_ratio(&stats_after),
};
info!("Defragmentation completed: {:?}", result);
Ok(result)
}
}
#[derive(Debug, Clone)]
pub struct DefragmentationResult {
pub duration: Duration,
pub memory_freed: usize,
pub fragmentation_before: f32,
pub fragmentation_after: f32,
}
pub struct OutOfCoreProcessor {
config: GpuAccelerationConfig,
chunk_size: usize,
overlap_size: usize,
memory_limit: usize,
}
impl OutOfCoreProcessor {
pub fn new(config: GpuAccelerationConfig) -> Self {
let memory_limit = config.memory_pool_size_mb * 1024 * 1024; let chunk_size = memory_limit / 4; let overlap_size = chunk_size / 10;
Self {
config,
chunk_size,
overlap_size,
memory_limit,
}
}
pub async fn process_large_batch<T>(
&self,
data: Vec<T>,
process_fn: impl Fn(&[T]) -> Result<Vec<Array1<f32>>> + Send + Sync + Copy,
) -> Result<Vec<Array1<f32>>>
where
T: Clone + Send + Sync + 'static,
{
if data.is_empty() {
return Ok(Vec::new());
}
let item_size = std::mem::size_of::<T>();
let max_items_per_chunk = self.chunk_size / item_size;
let chunk_size = max_items_per_chunk.clamp(1, 1000);
info!(
"Processing {} items in chunks of {}",
data.len(),
chunk_size
);
let mut results = Vec::new();
let mut processed_count = 0;
for chunk in data.chunks(chunk_size) {
let chunk_results = process_fn(chunk)?;
results.extend(chunk_results);
processed_count += chunk.len();
if processed_count % (chunk_size * 10) == 0 {
info!("Processed {}/{} items", processed_count, data.len());
}
tokio::task::yield_now().await;
}
Ok(results)
}
pub async fn process_with_overlap<T>(
&self,
data: Vec<T>,
process_fn: impl Fn(&[T]) -> Result<Vec<Array1<f32>>> + Send + Sync + Copy,
) -> Result<Vec<Array1<f32>>>
where
T: Clone + Send + Sync + 'static,
{
if data.is_empty() {
return Ok(Vec::new());
}
let item_size = std::mem::size_of::<T>();
let max_items_per_chunk = self.chunk_size / item_size;
let chunk_size = max_items_per_chunk.clamp(1, 1000);
let mut results = Vec::new();
let mut start_idx = 0;
while start_idx < data.len() {
let end_idx = (start_idx + chunk_size).min(data.len());
let chunk = &data[start_idx..end_idx];
let chunk_results = process_fn(chunk)?;
let take_count = if start_idx == 0 {
chunk_results.len()
} else {
chunk_results
.len()
.saturating_sub(self.overlap_size / item_size)
};
results.extend(chunk_results.into_iter().take(take_count));
start_idx += chunk_size - self.overlap_size / item_size;
tokio::task::yield_now().await;
}
Ok(results)
}
}
pub struct DynamicShapeHandler {
config: GpuAccelerationConfig,
shape_cache: HashMap<Vec<usize>, ShapeInfo>,
max_cached_shapes: usize,
}
#[derive(Debug, Clone)]
struct ShapeInfo {
shape: Vec<usize>,
memory_requirement: usize,
optimal_batch_size: usize,
last_used: Instant,
}
impl DynamicShapeHandler {
pub fn new(config: GpuAccelerationConfig) -> Self {
Self {
config,
shape_cache: HashMap::new(),
max_cached_shapes: 100,
}
}
pub fn optimize_shape(&mut self, shape: Vec<usize>) -> Vec<usize> {
if let Some(shape_info) = self.shape_cache.get_mut(&shape) {
shape_info.last_used = Instant::now();
return shape_info.shape.clone();
}
let optimized_shape = self.calculate_optimal_shape(&shape);
self.cache_shape_info(shape.clone(), optimized_shape.clone());
optimized_shape
}
fn calculate_optimal_shape(&self, shape: &[usize]) -> Vec<usize> {
let mut optimized = shape.to_vec();
const WARP_SIZE: usize = 32;
for dim in &mut optimized {
if *dim > 0 {
*dim = ((*dim + WARP_SIZE - 1) / WARP_SIZE) * WARP_SIZE;
}
}
optimized
}
fn cache_shape_info(&mut self, original_shape: Vec<usize>, optimized_shape: Vec<usize>) {
if self.shape_cache.len() >= self.max_cached_shapes {
self.evict_oldest_shape();
}
let memory_requirement = optimized_shape.iter().product::<usize>() * 4; let optimal_batch_size = self.calculate_optimal_batch_size(memory_requirement);
let shape_info = ShapeInfo {
shape: optimized_shape,
memory_requirement,
optimal_batch_size,
last_used: Instant::now(),
};
self.shape_cache.insert(original_shape, shape_info);
}
fn calculate_optimal_batch_size(&self, memory_per_item: usize) -> usize {
if memory_per_item == 0 {
return 1;
}
let available_memory = (self.config.memory_pool_size_mb * 1024 * 1024) / 2; let max_batch_size = available_memory / memory_per_item;
max_batch_size.clamp(1, 1024)
}
fn evict_oldest_shape(&mut self) {
if let Some(oldest_key) = self
.shape_cache
.iter()
.min_by_key(|(_, info)| info.last_used)
.map(|(key, _)| key.clone())
{
self.shape_cache.remove(&oldest_key);
}
}
pub fn get_optimal_batch_size(&self, shape: &[usize]) -> usize {
self.shape_cache
.get(shape)
.map(|info| info.optimal_batch_size)
.unwrap_or(1)
}
}
pub struct BatchSizeOptimizer {
config: GpuAccelerationConfig,
performance_history: VecDeque<BatchPerformance>,
max_history_size: usize,
current_optimal_batch_size: usize,
}
#[derive(Debug, Clone)]
struct BatchPerformance {
batch_size: usize,
processing_time: Duration,
memory_usage: usize,
throughput: f64, gpu_utilization: f64,
timestamp: Instant,
}
impl BatchSizeOptimizer {
pub fn new(config: GpuAccelerationConfig) -> Self {
Self {
config,
performance_history: VecDeque::new(),
max_history_size: 50,
current_optimal_batch_size: 32, }
}
pub async fn find_optimal_batch_size<T>(
&mut self,
sample_data: Vec<T>,
process_fn: impl Fn(&[T]) -> Result<Vec<Array1<f32>>> + Send + Sync + Copy,
) -> Result<usize>
where
T: Clone + Send + Sync + 'static,
{
if sample_data.is_empty() {
return Ok(1);
}
info!("Optimizing batch size for embedding generation");
let test_sizes = vec![1, 8, 16, 32, 64, 128, 256, 512];
let max_test_size = sample_data.len().min(512);
let mut best_batch_size = 1;
let mut best_throughput = 0.0;
for &batch_size in &test_sizes {
if batch_size > max_test_size {
break;
}
let performance = self
.test_batch_size(
&sample_data[..batch_size.min(sample_data.len())],
batch_size,
process_fn,
)
.await?;
info!(
"Batch size {}: {:.2} items/sec, {:.1}ms processing time",
batch_size,
performance.throughput,
performance.processing_time.as_millis()
);
if performance.throughput > best_throughput {
best_throughput = performance.throughput;
best_batch_size = batch_size;
}
self.performance_history.push_back(performance);
if self.performance_history.len() > self.max_history_size {
self.performance_history.pop_front();
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
self.current_optimal_batch_size = best_batch_size;
info!("Optimal batch size determined: {}", best_batch_size);
Ok(best_batch_size)
}
async fn test_batch_size<T>(
&self,
sample_data: &[T],
batch_size: usize,
process_fn: impl Fn(&[T]) -> Result<Vec<Array1<f32>>>,
) -> Result<BatchPerformance>
where
T: Clone,
{
let start_time = Instant::now();
let memory_before = self.estimate_memory_usage();
let _results = process_fn(sample_data)?;
let processing_time = start_time.elapsed();
let memory_after = self.estimate_memory_usage();
let memory_usage = memory_after.saturating_sub(memory_before);
let throughput = if processing_time.as_secs_f64() > 0.0 {
sample_data.len() as f64 / processing_time.as_secs_f64()
} else {
0.0
};
let gpu_utilization = self.estimate_gpu_utilization(batch_size, processing_time);
Ok(BatchPerformance {
batch_size,
processing_time,
memory_usage,
throughput,
gpu_utilization,
timestamp: Instant::now(),
})
}
fn estimate_memory_usage(&self) -> usize {
(self.config.memory_pool_size_mb * 1024 * 1024) / 4 }
fn estimate_gpu_utilization(&self, batch_size: usize, processing_time: Duration) -> f64 {
let base_utilization = (batch_size as f64).log2() / 10.0; let time_factor = if processing_time.as_millis() < 10 {
0.5 } else if processing_time.as_millis() > 1000 {
0.7 } else {
1.0
};
(base_utilization * time_factor).clamp(0.0, 1.0)
}
pub fn get_optimal_batch_size(&self) -> usize {
self.current_optimal_batch_size
}
pub fn get_performance_stats(&self) -> BatchSizeOptimizerStats {
let avg_throughput = if !self.performance_history.is_empty() {
self.performance_history
.iter()
.map(|p| p.throughput)
.sum::<f64>()
/ self.performance_history.len() as f64
} else {
0.0
};
let avg_gpu_utilization = if !self.performance_history.is_empty() {
self.performance_history
.iter()
.map(|p| p.gpu_utilization)
.sum::<f64>()
/ self.performance_history.len() as f64
} else {
0.0
};
BatchSizeOptimizerStats {
current_optimal_batch_size: self.current_optimal_batch_size,
avg_throughput,
avg_gpu_utilization,
total_tests_performed: self.performance_history.len(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BatchSizeOptimizerStats {
pub current_optimal_batch_size: usize,
pub avg_throughput: f64,
pub avg_gpu_utilization: f64,
pub total_tests_performed: usize,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_gpu_acceleration_config_default() {
let config = GpuAccelerationConfig::default();
assert!(config.enabled);
assert_eq!(config.device_ids, vec![0]);
assert_eq!(config.memory_pool_size_mb, 2048);
assert!(config.mixed_precision);
assert!(config.tensor_caching);
}
#[test]
fn test_memory_pool_allocation() {
let config = GpuAccelerationConfig::default();
let pool = GpuMemoryPool::new(config);
let block_id = pool.allocate(1024, 0).expect("should succeed");
assert!(block_id > 0);
pool.deallocate(block_id).expect("should succeed");
let block_id2 = pool.allocate(1024, 0).expect("should succeed");
assert_eq!(block_id, block_id2);
}
#[test]
fn test_tensor_cache() {
let config = GpuAccelerationConfig::default();
let cache = TensorCache::new(config);
let tensor = Array2::zeros((10, 20));
cache.cache_entity_tensor("test_entity", tensor.clone(), 0);
let cached = cache
.get_entity_tensor("test_entity")
.expect("should succeed");
assert_eq!(cached.shape(), tensor.shape());
}
#[test]
fn test_mixed_precision() {
let config = GpuAccelerationConfig::default();
let processor = MixedPrecisionProcessor::new(config);
let tensor = Array2::from_elem((2, 2), 1.0001);
let fp16_tensor = processor.to_fp16(&tensor);
if processor.fp16_enabled {
assert!(fp16_tensor[[0, 0]] != tensor[[0, 0]]);
} else {
assert_eq!(fp16_tensor[[0, 0]], tensor[[0, 0]]);
}
}
#[tokio::test]
async fn test_multi_stream_processing() {
let config = GpuAccelerationConfig::default();
let mut processor = MultiStreamProcessor::new(config);
let entities = vec!["entity1".to_string(), "entity2".to_string()];
let process_fn = |entity: String, _stream_id: usize| -> Array1<f32> {
Array1::from_vec(vec![entity.len() as f32])
};
let results = processor
.process_batch_parallel(entities, process_fn)
.await
.expect("should succeed");
assert_eq!(results.len(), 2);
}
#[test]
fn test_scirs2_gpu_accelerator() {
let config = GpuAccelerationConfig::default();
match SciRS2GpuAccelerator::new(config) {
Ok(accelerator) => {
assert!(accelerator.num_devices() > 0);
}
Err(_) => {
println!("Skipping GPU test: no hardware available");
}
}
}
#[test]
fn test_tensor_core_operations() {
let config = GpuAccelerationConfig::default();
if let Ok(accelerator) = SciRS2GpuAccelerator::new(config) {
let _matrix_a = Array2::<f32>::ones((256, 512));
let _matrix_b = Array2::<f32>::ones((512, 256));
let stats = accelerator.get_stats();
assert_eq!(stats.total_operations, 0);
} else {
println!("Skipping tensor core test: no GPU hardware available");
}
}
}
pub struct SciRS2GpuAccelerator {
config: GpuAccelerationConfig,
contexts: Vec<GpuContext>,
operations: Arc<AtomicUsize>,
}
impl SciRS2GpuAccelerator {
pub fn new(config: GpuAccelerationConfig) -> Result<Self> {
let mut contexts = Vec::new();
for _device_id in &config.device_ids {
match GpuContext::new(GpuBackend::Cuda) {
Ok(ctx) => {
info!("Initialized GPU context");
contexts.push(ctx);
}
Err(e) => {
warn!("Failed to initialize GPU device: {}", e);
}
}
}
if contexts.is_empty() {
return Err(anyhow!("No GPU devices available for acceleration"));
}
Ok(Self {
config,
contexts,
operations: Arc::new(AtomicUsize::new(0)),
})
}
pub fn num_devices(&self) -> usize {
self.contexts.len()
}
pub fn tensor_core_gemm(
&self,
a: &Array2<f32>,
b: &Array2<f32>,
use_mixed_precision: bool,
) -> Result<Array2<f32>> {
let result = if use_mixed_precision && self.config.mixed_precision {
a.dot(b)
} else {
a.dot(b)
};
self.operations.fetch_add(1, Ordering::Relaxed);
Ok(result)
}
pub fn batch_embed(
&self,
inputs: &[Array1<f32>],
embedding_matrix: &Array2<f32>,
) -> Result<Vec<Array1<f32>>> {
let batch_size = inputs.len();
let mut results = Vec::with_capacity(batch_size);
let stream_batch_size = if self.config.multi_stream {
(batch_size + self.config.num_streams - 1) / self.config.num_streams
} else {
batch_size
};
for chunk in inputs.chunks(stream_batch_size) {
for input in chunk {
let embedding = embedding_matrix.dot(input);
results.push(embedding);
}
}
self.operations.fetch_add(batch_size, Ordering::Relaxed);
Ok(results)
}
pub fn simd_similarity(
&self,
query: &Array1<f32>,
candidates: &[Array1<f32>],
) -> Result<Vec<f32>> {
let similarities: Vec<f32> = candidates
.iter()
.map(|candidate| {
query.dot(candidate)
})
.collect();
self.operations
.fetch_add(candidates.len(), Ordering::Relaxed);
Ok(similarities)
}
pub fn get_stats(&self) -> AcceleratorStats {
AcceleratorStats {
total_operations: self.operations.load(Ordering::Relaxed),
num_devices: self.contexts.len(),
profiler_report: "Stats available".to_string(),
}
}
pub fn clear_stats(&self) {
self.operations.store(0, Ordering::Relaxed);
}
}
#[derive(Debug, Clone)]
pub struct AcceleratorStats {
pub total_operations: usize,
pub num_devices: usize,
pub profiler_report: String,
}