use crate::{
faiss_integration::{FaissConfig, FaissSearchParams},
gpu::GpuExecutionConfig,
};
use anyhow::{Error as AnyhowError, Result};
use serde::{Deserialize, Serialize};
use std::collections::{BTreeMap, HashMap, VecDeque};
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc, Mutex, RwLock,
};
use std::time::{Duration, Instant};
use tokio::sync::oneshot;
use tracing::{debug, error, info, span, warn, Level};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FaissGpuConfig {
pub device_ids: Vec<i32>,
pub memory_per_device: usize,
pub enable_multi_gpu: bool,
pub memory_strategy: GpuMemoryStrategy,
pub stream_config: GpuStreamConfig,
pub optimization: GpuOptimizationConfig,
pub error_handling: GpuErrorConfig,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum GpuMemoryStrategy {
FixedPool,
Dynamic,
Unified,
Streaming { chunk_size: usize },
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GpuStreamConfig {
pub streams_per_device: usize,
pub enable_overlapping: bool,
pub priority_levels: Vec<i32>,
pub sync_strategy: SyncStrategy,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum SyncStrategy {
Blocking,
NonBlocking,
EventBased,
Cooperative,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GpuOptimizationConfig {
pub enable_tensor_cores: bool,
pub enable_mixed_precision: bool,
pub enable_coalescing: bool,
pub enable_kernel_fusion: bool,
pub cache_config: GpuCacheConfig,
pub batch_optimization: BatchOptimizationConfig,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GpuCacheConfig {
pub l1_cache_config: CacheConfig,
pub shared_memory_config: CacheConfig,
pub enable_prefetching: bool,
pub cache_line_size: usize,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum CacheConfig {
PreferL1,
PreferShared,
Equal,
Disabled,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BatchOptimizationConfig {
pub optimal_batch_sizes: HashMap<String, usize>,
pub enable_dynamic_batching: bool,
pub coalescence_threshold: usize,
pub max_batch_size: usize,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GpuErrorConfig {
pub enable_auto_recovery: bool,
pub max_retries: usize,
pub fallback_to_cpu: bool,
pub error_logging_level: String,
}
impl Default for FaissGpuConfig {
fn default() -> Self {
Self {
device_ids: vec![0],
memory_per_device: 2 * 1024 * 1024 * 1024, enable_multi_gpu: false,
memory_strategy: GpuMemoryStrategy::Dynamic,
stream_config: GpuStreamConfig {
streams_per_device: 4,
enable_overlapping: true,
priority_levels: vec![0, 1, 2],
sync_strategy: SyncStrategy::NonBlocking,
},
optimization: GpuOptimizationConfig {
enable_tensor_cores: true,
enable_mixed_precision: true,
enable_coalescing: true,
enable_kernel_fusion: true,
cache_config: GpuCacheConfig {
l1_cache_config: CacheConfig::PreferL1,
shared_memory_config: CacheConfig::PreferShared,
enable_prefetching: true,
cache_line_size: 128,
},
batch_optimization: BatchOptimizationConfig {
optimal_batch_sizes: {
let mut sizes = HashMap::new();
sizes.insert("search".to_string(), 1024);
sizes.insert("add".to_string(), 512);
sizes.insert("train".to_string(), 256);
sizes
},
enable_dynamic_batching: true,
coalescence_threshold: 64,
max_batch_size: 4096,
},
},
error_handling: GpuErrorConfig {
enable_auto_recovery: true,
max_retries: 3,
fallback_to_cpu: true,
error_logging_level: "warn".to_string(),
},
}
}
}
pub struct FaissGpuIndex {
faiss_config: FaissConfig,
gpu_config: FaissGpuConfig,
gpu_runtime: Arc<GpuExecutionConfig>,
memory_pools: Arc<RwLock<HashMap<i32, FaissGpuMemoryPool>>>,
compute_streams: Arc<RwLock<HashMap<i32, Vec<GpuComputeStream>>>>,
stats: Arc<RwLock<GpuPerformanceStats>>,
work_queue: Arc<Mutex<VecDeque<GpuOperation>>>,
results_cache: Arc<RwLock<HashMap<String, CachedResult>>>,
load_balancer: Arc<RwLock<GpuLoadBalancer>>,
}
#[derive(Debug)]
pub struct FaissGpuMemoryPool {
pub device_id: i32,
pub total_size: usize,
pub allocated_size: AtomicUsize,
pub free_blocks: Arc<Mutex<BTreeMap<usize, Vec<GpuMemoryBlock>>>>,
pub allocated_blocks: Arc<RwLock<HashMap<usize, GpuMemoryBlock>>>,
pub allocation_stats: Arc<RwLock<AllocationStatistics>>,
}
#[derive(Debug)]
pub struct GpuMemoryBlock {
pub gpu_address: usize,
pub size: usize,
pub allocated_at: Instant,
pub ref_count: AtomicUsize,
pub block_type: MemoryBlockType,
}
impl Clone for GpuMemoryBlock {
fn clone(&self) -> Self {
Self {
gpu_address: self.gpu_address,
size: self.size,
allocated_at: self.allocated_at,
ref_count: AtomicUsize::new(self.ref_count.load(Ordering::Relaxed)),
block_type: self.block_type,
}
}
}
#[derive(Debug, Clone, Copy)]
pub enum MemoryBlockType {
Vectors,
IndexData,
Temporary,
Results,
}
#[derive(Debug)]
pub struct GpuComputeStream {
pub stream_id: usize,
pub device_id: i32,
pub stream_handle: usize,
pub priority: i32,
pub current_operation: Arc<Mutex<Option<GpuOperation>>>,
pub operation_history: Arc<RwLock<VecDeque<CompletedOperation>>>,
pub utilization: Arc<RwLock<StreamUtilization>>,
}
#[derive(Debug)]
pub struct GpuOperation {
pub id: String,
pub operation_type: GpuOperationType,
pub input_data: GpuOperationData,
pub output_size: usize,
pub priority: i32,
pub timeout: Option<Duration>,
pub result_sender: Option<oneshot::Sender<GpuOperationResult>>,
}
#[derive(Debug, Clone)]
pub enum GpuOperationType {
Search {
query_vectors: Vec<Vec<f32>>,
k: usize,
search_params: FaissSearchParams,
},
Add {
vectors: Vec<Vec<f32>>,
ids: Vec<String>,
},
Train { training_vectors: Vec<Vec<f32>> },
Optimize,
MemoryTransfer {
source: TransferSource,
destination: TransferDestination,
size: usize,
},
}
#[derive(Debug, Clone)]
pub enum GpuOperationData {
Vectors(Vec<Vec<f32>>),
IndexData(Vec<u8>),
QueryParams(HashMap<String, Vec<u8>>),
Empty,
}
#[derive(Debug, Clone)]
pub enum TransferSource {
CpuMemory(Vec<u8>),
GpuMemory { device_id: i32, address: usize },
Disk(std::path::PathBuf),
}
#[derive(Debug, Clone)]
pub enum TransferDestination {
CpuMemory,
GpuMemory { device_id: i32, address: usize },
Disk(std::path::PathBuf),
}
#[derive(Debug, Clone)]
pub struct GpuOperationResult {
pub operation_id: String,
pub success: bool,
pub result_data: GpuResultData,
pub execution_time: Duration,
pub memory_used: usize,
pub error_message: Option<String>,
}
#[derive(Debug, Clone)]
pub enum GpuResultData {
SearchResults(Vec<Vec<(String, f32)>>),
TrainingComplete,
AdditionComplete,
OptimizationMetrics(HashMap<String, f64>),
TransferComplete,
Error(String),
}
#[derive(Debug, Clone)]
pub struct CompletedOperation {
pub operation_id: String,
pub operation_type: String,
pub start_time: Instant,
pub end_time: Instant,
pub success: bool,
pub memory_used: usize,
}
#[derive(Debug, Clone, Default)]
pub struct StreamUtilization {
pub total_operations: usize,
pub total_execution_time: Duration,
pub avg_execution_time: Duration,
pub utilization_percentage: f32,
pub idle_time: Duration,
}
#[derive(Debug, Clone, Default)]
pub struct GpuPerformanceStats {
pub device_stats: HashMap<i32, DeviceStats>,
pub overall_utilization: f32,
pub memory_efficiency: f32,
pub throughput: ThroughputMetrics,
pub error_stats: ErrorStatistics,
pub performance_trends: PerformanceTrends,
}
#[derive(Debug, Clone, Default)]
pub struct DeviceStats {
pub utilization: f32,
pub memory_usage: MemoryUsageStats,
pub compute_performance: ComputePerformanceStats,
pub power_consumption: f32,
pub temperature: f32,
}
#[derive(Debug, Clone, Default)]
pub struct MemoryUsageStats {
pub total_memory: usize,
pub used_memory: usize,
pub free_memory: usize,
pub peak_usage: usize,
pub fragmentation: f32,
}
#[derive(Debug, Clone, Default)]
pub struct ComputePerformanceStats {
pub flops: f64,
pub memory_bandwidth_utilization: f32,
pub kernel_efficiency: f32,
pub occupancy: f32,
}
#[derive(Debug, Clone, Default)]
pub struct ThroughputMetrics {
pub vectors_per_second: f64,
pub operations_per_second: f64,
pub transfer_rate_mbps: f64,
pub search_qps: f64,
}
#[derive(Debug, Clone, Default)]
pub struct ErrorStatistics {
pub total_errors: usize,
pub recoverable_errors: usize,
pub fatal_errors: usize,
pub error_rate: f32,
pub recovery_rate: f32,
}
#[derive(Debug, Clone, Default)]
pub struct PerformanceTrends {
pub utilization_trend: Vec<(Instant, f32)>,
pub throughput_trend: Vec<(Instant, f64)>,
pub memory_trend: Vec<(Instant, usize)>,
pub error_trend: Vec<(Instant, f32)>,
}
#[derive(Debug, Clone, Default)]
pub struct AllocationStatistics {
pub total_allocations: usize,
pub total_deallocations: usize,
pub peak_usage: usize,
pub avg_allocation_size: usize,
pub fragmentation_events: usize,
pub oom_events: usize,
}
#[derive(Debug)]
pub struct CachedResult {
pub data: GpuResultData,
pub timestamp: Instant,
pub hit_count: AtomicUsize,
pub size: usize,
}
impl Clone for CachedResult {
fn clone(&self) -> Self {
Self {
data: self.data.clone(),
timestamp: self.timestamp,
hit_count: AtomicUsize::new(self.hit_count.load(Ordering::Acquire)),
size: self.size,
}
}
}
#[derive(Debug)]
pub struct GpuLoadBalancer {
pub device_utilization: HashMap<i32, f32>,
pub workload_distribution: HashMap<i32, usize>,
pub strategy: LoadBalancingStrategy,
pub performance_history: HashMap<i32, VecDeque<PerformanceSnapshot>>,
}
#[derive(Debug, Clone)]
pub enum LoadBalancingStrategy {
RoundRobin,
LoadBased,
PerformanceBased,
MemoryAware,
Hybrid,
}
#[derive(Debug, Clone)]
pub struct PerformanceSnapshot {
pub timestamp: Instant,
pub utilization: f32,
pub memory_usage: f32,
pub ops_per_second: f64,
pub avg_latency: Duration,
}
impl FaissGpuIndex {
pub async fn new(faiss_config: FaissConfig, gpu_config: FaissGpuConfig) -> Result<Self> {
let span = span!(Level::INFO, "faiss_gpu_index_new");
let _enter = span.enter();
let _base_gpu_config = crate::gpu::GpuConfig {
device_id: gpu_config.device_ids.first().copied().unwrap_or(0),
enable_mixed_precision: true,
enable_tensor_cores: true,
batch_size: 1024,
memory_pool_size: gpu_config.memory_per_device,
stream_count: gpu_config.stream_config.streams_per_device,
enable_peer_access: gpu_config.enable_multi_gpu,
enable_unified_memory: matches!(gpu_config.memory_strategy, GpuMemoryStrategy::Unified),
enable_async_execution: true,
enable_multi_gpu: gpu_config.enable_multi_gpu,
preferred_gpu_ids: gpu_config.device_ids.clone(),
dynamic_batch_sizing: true,
enable_memory_compression: false,
kernel_cache_size: 1024 * 1024,
optimization_level: crate::gpu::OptimizationLevel::Performance,
precision_mode: crate::gpu::PrecisionMode::Mixed,
};
let gpu_runtime = Arc::new(GpuExecutionConfig::default());
let mut memory_pools = HashMap::new();
for &device_id in &gpu_config.device_ids {
let pool = FaissGpuMemoryPool::new(device_id, gpu_config.memory_per_device)?;
memory_pools.insert(device_id, pool);
}
let mut compute_streams = HashMap::new();
for &device_id in &gpu_config.device_ids {
let streams = Self::create_compute_streams(device_id, &gpu_config.stream_config)?;
compute_streams.insert(device_id, streams);
}
let load_balancer =
GpuLoadBalancer::new(&gpu_config.device_ids, LoadBalancingStrategy::Hybrid);
let device_count = gpu_config.device_ids.len();
let index = Self {
faiss_config,
gpu_config,
gpu_runtime,
memory_pools: Arc::new(RwLock::new(memory_pools)),
compute_streams: Arc::new(RwLock::new(compute_streams)),
stats: Arc::new(RwLock::new(GpuPerformanceStats::default())),
work_queue: Arc::new(Mutex::new(VecDeque::new())),
results_cache: Arc::new(RwLock::new(HashMap::new())),
load_balancer: Arc::new(RwLock::new(load_balancer)),
};
index.start_background_workers().await?;
info!(
"Created GPU-accelerated FAISS index with {} devices",
device_count
);
Ok(index)
}
fn create_compute_streams(
device_id: i32,
stream_config: &GpuStreamConfig,
) -> Result<Vec<GpuComputeStream>> {
let mut streams = Vec::new();
for i in 0..stream_config.streams_per_device {
let priority = stream_config
.priority_levels
.get(i % stream_config.priority_levels.len())
.copied()
.unwrap_or(0);
let stream = GpuComputeStream {
stream_id: i,
device_id,
stream_handle: device_id as usize * 1000 + i, priority,
current_operation: Arc::new(Mutex::new(None)),
operation_history: Arc::new(RwLock::new(VecDeque::new())),
utilization: Arc::new(RwLock::new(StreamUtilization::default())),
};
streams.push(stream);
}
Ok(streams)
}
async fn start_background_workers(&self) -> Result<()> {
let span = span!(Level::DEBUG, "start_background_workers");
let _enter = span.enter();
self.start_operation_processor().await?;
self.start_performance_monitor().await?;
self.start_memory_manager().await?;
if self.gpu_config.enable_multi_gpu {
self.start_load_balancer().await?;
}
debug!("Started background worker tasks");
Ok(())
}
async fn start_operation_processor(&self) -> Result<()> {
let work_queue = Arc::clone(&self.work_queue);
let compute_streams = Arc::clone(&self.compute_streams);
let stats = Arc::clone(&self.stats);
let gpu_config = self.gpu_config.clone();
tokio::spawn(async move {
loop {
if let Some(operation) = {
let mut queue = work_queue.lock().expect("lock poisoned");
queue.pop_front()
} {
if let Err(e) = Self::process_gpu_operation(
operation,
&compute_streams,
&stats,
&gpu_config,
)
.await
{
error!("Failed to process GPU operation: {}", e);
}
}
tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
}
});
Ok(())
}
async fn process_gpu_operation(
mut operation: GpuOperation,
compute_streams: &Arc<RwLock<HashMap<i32, Vec<GpuComputeStream>>>>,
stats: &Arc<RwLock<GpuPerformanceStats>>,
gpu_config: &FaissGpuConfig,
) -> Result<()> {
let start_time = Instant::now();
let (device_id, stream_id) =
Self::select_optimal_stream(compute_streams, &operation).await?;
let result_sender = operation.result_sender.take();
let result =
Self::execute_operation_on_device(operation, device_id, stream_id, gpu_config).await?;
if let Some(sender) = result_sender {
let _ = sender.send(result.clone());
}
Self::update_operation_stats(stats, &result, start_time.elapsed()).await?;
Ok(())
}
async fn select_optimal_stream(
compute_streams: &Arc<RwLock<HashMap<i32, Vec<GpuComputeStream>>>>,
_operation: &GpuOperation,
) -> Result<(i32, usize)> {
let streams = compute_streams.read().expect("lock poisoned");
let mut best_device = 0;
let mut best_stream = 0;
let mut lowest_utilization = f32::MAX;
for (&device_id, device_streams) in streams.iter() {
for (stream_id, stream) in device_streams.iter().enumerate() {
let utilization = stream
.utilization
.read()
.expect("lock poisoned")
.utilization_percentage;
if utilization < lowest_utilization {
lowest_utilization = utilization;
best_device = device_id;
best_stream = stream_id;
}
}
}
Ok((best_device, best_stream))
}
async fn execute_operation_on_device(
operation: GpuOperation,
_device_id: i32,
_stream_id: usize,
_gpu_config: &FaissGpuConfig,
) -> Result<GpuOperationResult> {
let start_time = Instant::now();
let result_data = match &operation.operation_type {
GpuOperationType::Search {
query_vectors, k, ..
} => {
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
let mut results = Vec::new();
for _query in query_vectors {
let mut query_results = Vec::new();
for i in 0..*k {
query_results.push((format!("gpu_result_{i}"), 0.95 - (i as f32 * 0.05)));
}
results.push(query_results);
}
GpuResultData::SearchResults(results)
}
GpuOperationType::Add { vectors: _, .. } => {
tokio::time::sleep(tokio::time::Duration::from_millis(5)).await;
GpuResultData::AdditionComplete
}
GpuOperationType::Train { .. } => {
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
GpuResultData::TrainingComplete
}
GpuOperationType::Optimize => {
tokio::time::sleep(tokio::time::Duration::from_millis(20)).await;
let mut metrics = HashMap::new();
metrics.insert("optimization_improvement".to_string(), 15.0);
metrics.insert("memory_efficiency".to_string(), 92.0);
GpuResultData::OptimizationMetrics(metrics)
}
GpuOperationType::MemoryTransfer { size, .. } => {
let transfer_time = *size as f64 / (10.0 * 1024.0 * 1024.0 * 1024.0); tokio::time::sleep(tokio::time::Duration::from_secs_f64(transfer_time)).await;
GpuResultData::TransferComplete
}
};
Ok(GpuOperationResult {
operation_id: operation.id,
success: true,
result_data,
execution_time: start_time.elapsed(),
memory_used: 1024 * 1024, error_message: None,
})
}
async fn update_operation_stats(
stats: &Arc<RwLock<GpuPerformanceStats>>,
result: &GpuOperationResult,
execution_time: Duration,
) -> Result<()> {
let mut stats = stats.write().expect("lock poisoned");
stats.throughput.operations_per_second += 1.0 / execution_time.as_secs_f64();
if !result.success {
stats.error_stats.total_errors += 1;
}
Ok(())
}
async fn start_performance_monitor(&self) -> Result<()> {
let stats = Arc::clone(&self.stats);
let device_ids = self.gpu_config.device_ids.clone();
tokio::spawn(async move {
let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(1));
loop {
interval.tick().await;
if let Err(e) = Self::collect_performance_metrics(&stats, &device_ids).await {
warn!("Failed to collect performance metrics: {}", e);
}
}
});
Ok(())
}
async fn collect_performance_metrics(
stats: &Arc<RwLock<GpuPerformanceStats>>,
device_ids: &[i32],
) -> Result<()> {
let mut stats = stats.write().expect("lock poisoned");
for &device_id in device_ids {
let device_stats = DeviceStats {
utilization: 75.0 + (device_id as f32 * 5.0) % 25.0, memory_usage: MemoryUsageStats {
total_memory: 8 * 1024 * 1024 * 1024, used_memory: 6 * 1024 * 1024 * 1024, free_memory: 2 * 1024 * 1024 * 1024, peak_usage: 7 * 1024 * 1024 * 1024, fragmentation: 5.0,
},
compute_performance: ComputePerformanceStats {
flops: 15.5e12, memory_bandwidth_utilization: 80.0,
kernel_efficiency: 85.0,
occupancy: 75.0,
},
power_consumption: 250.0, temperature: 70.0, };
stats.device_stats.insert(device_id, device_stats);
}
stats.overall_utilization = stats
.device_stats
.values()
.map(|s| s.utilization)
.sum::<f32>()
/ stats.device_stats.len() as f32;
Ok(())
}
async fn start_memory_manager(&self) -> Result<()> {
let memory_pools = Arc::clone(&self.memory_pools);
let gpu_config = self.gpu_config.clone();
tokio::spawn(async move {
let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(5));
loop {
interval.tick().await;
if let Err(e) = Self::manage_gpu_memory(&memory_pools, &gpu_config).await {
warn!("Failed to manage GPU memory: {}", e);
}
}
});
Ok(())
}
async fn manage_gpu_memory(
memory_pools: &Arc<RwLock<HashMap<i32, FaissGpuMemoryPool>>>,
_gpu_config: &FaissGpuConfig,
) -> Result<()> {
let pools = memory_pools.read().expect("lock poisoned");
for (device_id, pool) in pools.iter() {
let fragmentation = pool.calculate_fragmentation();
if fragmentation > 20.0 {
debug!(
"High fragmentation detected on device {}: {:.1}%",
device_id, fragmentation
);
}
let allocated_blocks = pool.allocated_blocks.read().expect("lock poisoned");
let now = Instant::now();
for (_, block) in allocated_blocks.iter() {
if now.duration_since(block.allocated_at) > Duration::from_secs(3600) {
warn!(
"Potential memory leak detected on device {}: block allocated {} ago",
device_id,
humantime::format_duration(now.duration_since(block.allocated_at))
);
}
}
}
Ok(())
}
async fn start_load_balancer(&self) -> Result<()> {
let load_balancer = Arc::clone(&self.load_balancer);
let stats = Arc::clone(&self.stats);
tokio::spawn(async move {
let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(2));
loop {
interval.tick().await;
if let Err(e) = Self::update_load_balancing(&load_balancer, &stats).await {
warn!("Failed to update load balancing: {}", e);
}
}
});
Ok(())
}
async fn update_load_balancing(
load_balancer: &Arc<RwLock<GpuLoadBalancer>>,
stats: &Arc<RwLock<GpuPerformanceStats>>,
) -> Result<()> {
let stats = stats.read().expect("lock poisoned");
let mut balancer = load_balancer.write().expect("lock poisoned");
for (&device_id, device_stats) in &stats.device_stats {
balancer
.device_utilization
.insert(device_id, device_stats.utilization);
let snapshot = PerformanceSnapshot {
timestamp: Instant::now(),
utilization: device_stats.utilization,
memory_usage: device_stats.memory_usage.used_memory as f32
/ device_stats.memory_usage.total_memory as f32
* 100.0,
ops_per_second: 1000.0, avg_latency: Duration::from_micros(250),
};
balancer
.performance_history
.entry(device_id)
.or_default()
.push_back(snapshot);
if balancer.performance_history[&device_id].len() > 100 {
balancer
.performance_history
.get_mut(&device_id)
.expect("device_id should exist in performance_history")
.pop_front();
}
}
Ok(())
}
pub async fn search_gpu(
&self,
query_vectors: Vec<Vec<f32>>,
k: usize,
search_params: FaissSearchParams,
) -> Result<Vec<Vec<(String, f32)>>> {
let span = span!(Level::DEBUG, "search_gpu");
let _enter = span.enter();
let (result_sender, result_receiver) = oneshot::channel();
let operation = GpuOperation {
id: uuid::Uuid::new_v4().to_string(),
operation_type: GpuOperationType::Search {
query_vectors: query_vectors.clone(),
k,
search_params,
},
input_data: GpuOperationData::Vectors(query_vectors),
output_size: k * std::mem::size_of::<(String, f32)>(),
priority: 1,
timeout: Some(Duration::from_secs(30)),
result_sender: Some(result_sender),
};
{
let mut queue = self.work_queue.lock().expect("lock poisoned");
queue.push_back(operation);
}
let result = result_receiver
.await
.map_err(|_| AnyhowError::msg("GPU operation timeout"))?;
if !result.success {
return Err(AnyhowError::msg(
result
.error_message
.unwrap_or_else(|| "GPU operation failed".to_string()),
));
}
match result.result_data {
GpuResultData::SearchResults(results) => Ok(results),
_ => Err(AnyhowError::msg("Unexpected result type")),
}
}
pub async fn add_vectors_gpu(&self, vectors: Vec<Vec<f32>>, ids: Vec<String>) -> Result<()> {
let span = span!(Level::DEBUG, "add_vectors_gpu");
let _enter = span.enter();
let (result_sender, result_receiver) = oneshot::channel();
let operation = GpuOperation {
id: uuid::Uuid::new_v4().to_string(),
operation_type: GpuOperationType::Add {
vectors: vectors.clone(),
ids,
},
input_data: GpuOperationData::Vectors(vectors),
output_size: 0,
priority: 2,
timeout: Some(Duration::from_secs(60)),
result_sender: Some(result_sender),
};
{
let mut queue = self.work_queue.lock().expect("lock poisoned");
queue.push_back(operation);
}
let result = result_receiver
.await
.map_err(|_| AnyhowError::msg("GPU operation timeout"))?;
if !result.success {
return Err(AnyhowError::msg(
result
.error_message
.unwrap_or_else(|| "GPU operation failed".to_string()),
));
}
Ok(())
}
pub fn get_gpu_stats(&self) -> Result<GpuPerformanceStats> {
let stats = self.stats.read().expect("lock poisoned");
Ok(stats.clone())
}
pub async fn optimize_gpu_performance(&self) -> Result<HashMap<String, f64>> {
let span = span!(Level::INFO, "optimize_gpu_performance");
let _enter = span.enter();
let (result_sender, result_receiver) = oneshot::channel();
let operation = GpuOperation {
id: uuid::Uuid::new_v4().to_string(),
operation_type: GpuOperationType::Optimize,
input_data: GpuOperationData::Empty,
output_size: 0,
priority: 0, timeout: Some(Duration::from_secs(120)),
result_sender: Some(result_sender),
};
{
let mut queue = self.work_queue.lock().expect("lock poisoned");
queue.push_back(operation);
}
let result = result_receiver
.await
.map_err(|_| AnyhowError::msg("GPU optimization timeout"))?;
if !result.success {
return Err(AnyhowError::msg("GPU optimization failed"));
}
match result.result_data {
GpuResultData::OptimizationMetrics(metrics) => Ok(metrics),
_ => Err(AnyhowError::msg("Unexpected result type")),
}
}
}
impl FaissGpuMemoryPool {
pub fn new(device_id: i32, total_size: usize) -> Result<Self> {
Ok(Self {
device_id,
total_size,
allocated_size: AtomicUsize::new(0),
free_blocks: Arc::new(Mutex::new(BTreeMap::new())),
allocated_blocks: Arc::new(RwLock::new(HashMap::new())),
allocation_stats: Arc::new(RwLock::new(AllocationStatistics::default())),
})
}
pub fn allocate(&self, size: usize, block_type: MemoryBlockType) -> Result<GpuMemoryBlock> {
let aligned_size = (size + 255) & !255;
if self.allocated_size.load(Ordering::Relaxed) + aligned_size > self.total_size {
return Err(AnyhowError::msg("Out of GPU memory"));
}
let block = GpuMemoryBlock {
gpu_address: self.allocated_size.load(Ordering::Relaxed), size: aligned_size,
allocated_at: Instant::now(),
ref_count: AtomicUsize::new(1),
block_type,
};
self.allocated_size
.fetch_add(aligned_size, Ordering::Relaxed);
{
let mut stats = self.allocation_stats.write().expect("lock poisoned");
stats.total_allocations += 1;
let current_usage = self.allocated_size.load(Ordering::Relaxed);
if current_usage > stats.peak_usage {
stats.peak_usage = current_usage;
}
}
Ok(block)
}
pub fn deallocate(&self, block: &GpuMemoryBlock) -> Result<()> {
self.allocated_size.fetch_sub(block.size, Ordering::Relaxed);
{
let mut stats = self.allocation_stats.write().expect("lock poisoned");
stats.total_deallocations += 1;
}
Ok(())
}
pub fn calculate_fragmentation(&self) -> f32 {
let allocated = self.allocated_size.load(Ordering::Relaxed);
let free_blocks = self.free_blocks.lock().expect("lock poisoned");
let num_free_blocks = free_blocks.len();
if allocated == 0 {
return 0.0;
}
(num_free_blocks as f32 / (allocated / 1024) as f32) * 100.0
}
}
impl GpuLoadBalancer {
pub fn new(device_ids: &[i32], strategy: LoadBalancingStrategy) -> Self {
let mut device_utilization = HashMap::new();
let mut workload_distribution = HashMap::new();
let mut performance_history = HashMap::new();
for &device_id in device_ids {
device_utilization.insert(device_id, 0.0);
workload_distribution.insert(device_id, 0);
performance_history.insert(device_id, VecDeque::new());
}
Self {
device_utilization,
workload_distribution,
strategy,
performance_history,
}
}
pub fn select_device(&self, operation: &GpuOperation) -> i32 {
match self.strategy {
LoadBalancingStrategy::RoundRobin => self.select_round_robin(),
LoadBalancingStrategy::LoadBased => self.select_load_based(),
LoadBalancingStrategy::PerformanceBased => self.select_performance_based(),
LoadBalancingStrategy::MemoryAware => self.select_memory_aware(),
LoadBalancingStrategy::Hybrid => self.select_hybrid(operation),
}
}
fn select_round_robin(&self) -> i32 {
let total_workload: usize = self.workload_distribution.values().sum();
let device_count = self.device_utilization.len();
let target_device_index = total_workload % device_count;
*self
.device_utilization
.keys()
.nth(target_device_index)
.unwrap_or(&0)
}
fn select_load_based(&self) -> i32 {
self.device_utilization
.iter()
.min_by(|a, b| a.1.partial_cmp(b.1).unwrap_or(std::cmp::Ordering::Equal))
.map(|(&device_id, _)| device_id)
.unwrap_or(0)
}
fn select_performance_based(&self) -> i32 {
let mut best_device = 0;
let mut best_score = f64::MIN;
for (&device_id, history) in &self.performance_history {
if let Some(recent_snapshot) = history.back() {
let score = recent_snapshot.ops_per_second
/ (recent_snapshot.avg_latency.as_secs_f64() + 1e-6);
if score > best_score {
best_score = score;
best_device = device_id;
}
}
}
best_device
}
fn select_memory_aware(&self) -> i32 {
self.device_utilization
.iter()
.min_by(|a, b| a.1.partial_cmp(b.1).unwrap_or(std::cmp::Ordering::Equal))
.map(|(&device_id, _)| device_id)
.unwrap_or(0)
}
fn select_hybrid(&self, operation: &GpuOperation) -> i32 {
match &operation.operation_type {
GpuOperationType::Search { .. } => self.select_performance_based(),
GpuOperationType::Add { .. } => self.select_memory_aware(),
GpuOperationType::Train { .. } => self.select_load_based(),
_ => self.select_round_robin(),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_faiss_gpu_index_creation() {
let faiss_config = FaissConfig::default();
let gpu_config = FaissGpuConfig::default();
let result = FaissGpuIndex::new(faiss_config, gpu_config).await;
assert!(result.is_ok());
}
#[test]
fn test_gpu_memory_pool() -> Result<()> {
let pool = FaissGpuMemoryPool::new(0, 1024 * 1024)?;
let block = pool.allocate(1024, MemoryBlockType::Vectors)?;
assert_eq!(block.size, 1024);
pool.deallocate(&block)?;
assert_eq!(pool.allocated_size.load(Ordering::Relaxed), 0);
Ok(())
}
#[test]
fn test_gpu_load_balancer() {
let device_ids = vec![0, 1, 2];
let balancer = GpuLoadBalancer::new(&device_ids, LoadBalancingStrategy::RoundRobin);
assert_eq!(balancer.device_utilization.len(), 3);
let operation = GpuOperation {
id: "test".to_string(),
operation_type: GpuOperationType::Optimize,
input_data: GpuOperationData::Empty,
output_size: 0,
priority: 0,
timeout: None,
result_sender: None,
};
let selected_device = balancer.select_device(&operation);
assert!(device_ids.contains(&selected_device));
}
}