1use crate::{
15 faiss_integration::{FaissConfig, FaissSearchParams},
16 gpu::GpuExecutionConfig,
17};
18use anyhow::{Error as AnyhowError, Result};
19use serde::{Deserialize, Serialize};
20use std::collections::{BTreeMap, HashMap, VecDeque};
21use std::sync::{
22 atomic::{AtomicUsize, Ordering},
23 Arc, Mutex, RwLock,
24};
25use std::time::{Duration, Instant};
26use tokio::sync::oneshot;
27use tracing::{debug, error, info, span, warn, Level};
28
29#[derive(Debug, Clone, Serialize, Deserialize)]
31pub struct FaissGpuConfig {
32 pub device_ids: Vec<i32>,
34 pub memory_per_device: usize,
36 pub enable_multi_gpu: bool,
38 pub memory_strategy: GpuMemoryStrategy,
40 pub stream_config: GpuStreamConfig,
42 pub optimization: GpuOptimizationConfig,
44 pub error_handling: GpuErrorConfig,
46}
47
48#[derive(Debug, Clone, Serialize, Deserialize)]
50pub enum GpuMemoryStrategy {
51 FixedPool,
53 Dynamic,
55 Unified,
57 Streaming { chunk_size: usize },
59}
60
61#[derive(Debug, Clone, Serialize, Deserialize)]
63pub struct GpuStreamConfig {
64 pub streams_per_device: usize,
66 pub enable_overlapping: bool,
68 pub priority_levels: Vec<i32>,
70 pub sync_strategy: SyncStrategy,
72}
73
74#[derive(Debug, Clone, Serialize, Deserialize)]
76pub enum SyncStrategy {
77 Blocking,
79 NonBlocking,
81 EventBased,
83 Cooperative,
85}
86
87#[derive(Debug, Clone, Serialize, Deserialize)]
89pub struct GpuOptimizationConfig {
90 pub enable_tensor_cores: bool,
92 pub enable_mixed_precision: bool,
94 pub enable_coalescing: bool,
96 pub enable_kernel_fusion: bool,
98 pub cache_config: GpuCacheConfig,
100 pub batch_optimization: BatchOptimizationConfig,
102}
103
104#[derive(Debug, Clone, Serialize, Deserialize)]
106pub struct GpuCacheConfig {
107 pub l1_cache_config: CacheConfig,
109 pub shared_memory_config: CacheConfig,
111 pub enable_prefetching: bool,
113 pub cache_line_size: usize,
115}
116
117#[derive(Debug, Clone, Serialize, Deserialize)]
119pub enum CacheConfig {
120 PreferL1,
122 PreferShared,
124 Equal,
126 Disabled,
128}
129
130#[derive(Debug, Clone, Serialize, Deserialize)]
132pub struct BatchOptimizationConfig {
133 pub optimal_batch_sizes: HashMap<String, usize>,
135 pub enable_dynamic_batching: bool,
137 pub coalescence_threshold: usize,
139 pub max_batch_size: usize,
141}
142
143#[derive(Debug, Clone, Serialize, Deserialize)]
145pub struct GpuErrorConfig {
146 pub enable_auto_recovery: bool,
148 pub max_retries: usize,
150 pub fallback_to_cpu: bool,
152 pub error_logging_level: String,
154}
155
156impl Default for FaissGpuConfig {
157 fn default() -> Self {
158 Self {
159 device_ids: vec![0],
160 memory_per_device: 2 * 1024 * 1024 * 1024, enable_multi_gpu: false,
162 memory_strategy: GpuMemoryStrategy::Dynamic,
163 stream_config: GpuStreamConfig {
164 streams_per_device: 4,
165 enable_overlapping: true,
166 priority_levels: vec![0, 1, 2],
167 sync_strategy: SyncStrategy::NonBlocking,
168 },
169 optimization: GpuOptimizationConfig {
170 enable_tensor_cores: true,
171 enable_mixed_precision: true,
172 enable_coalescing: true,
173 enable_kernel_fusion: true,
174 cache_config: GpuCacheConfig {
175 l1_cache_config: CacheConfig::PreferL1,
176 shared_memory_config: CacheConfig::PreferShared,
177 enable_prefetching: true,
178 cache_line_size: 128,
179 },
180 batch_optimization: BatchOptimizationConfig {
181 optimal_batch_sizes: {
182 let mut sizes = HashMap::new();
183 sizes.insert("search".to_string(), 1024);
184 sizes.insert("add".to_string(), 512);
185 sizes.insert("train".to_string(), 256);
186 sizes
187 },
188 enable_dynamic_batching: true,
189 coalescence_threshold: 64,
190 max_batch_size: 4096,
191 },
192 },
193 error_handling: GpuErrorConfig {
194 enable_auto_recovery: true,
195 max_retries: 3,
196 fallback_to_cpu: true,
197 error_logging_level: "warn".to_string(),
198 },
199 }
200 }
201}
202
203pub struct FaissGpuIndex {
205 faiss_config: FaissConfig,
207 gpu_config: FaissGpuConfig,
209 gpu_runtime: Arc<GpuExecutionConfig>,
211 memory_pools: Arc<RwLock<HashMap<i32, FaissGpuMemoryPool>>>,
213 compute_streams: Arc<RwLock<HashMap<i32, Vec<GpuComputeStream>>>>,
215 stats: Arc<RwLock<GpuPerformanceStats>>,
217 work_queue: Arc<Mutex<VecDeque<GpuOperation>>>,
219 results_cache: Arc<RwLock<HashMap<String, CachedResult>>>,
221 load_balancer: Arc<RwLock<GpuLoadBalancer>>,
223}
224
225#[derive(Debug)]
227pub struct FaissGpuMemoryPool {
228 pub device_id: i32,
230 pub total_size: usize,
232 pub allocated_size: AtomicUsize,
234 pub free_blocks: Arc<Mutex<BTreeMap<usize, Vec<GpuMemoryBlock>>>>,
236 pub allocated_blocks: Arc<RwLock<HashMap<usize, GpuMemoryBlock>>>,
238 pub allocation_stats: Arc<RwLock<AllocationStatistics>>,
240}
241
242#[derive(Debug)]
244pub struct GpuMemoryBlock {
245 pub gpu_address: usize,
247 pub size: usize,
249 pub allocated_at: Instant,
251 pub ref_count: AtomicUsize,
253 pub block_type: MemoryBlockType,
255}
256
257impl Clone for GpuMemoryBlock {
258 fn clone(&self) -> Self {
259 Self {
260 gpu_address: self.gpu_address,
261 size: self.size,
262 allocated_at: self.allocated_at,
263 ref_count: AtomicUsize::new(self.ref_count.load(Ordering::Relaxed)),
264 block_type: self.block_type,
265 }
266 }
267}
268
269#[derive(Debug, Clone, Copy)]
271pub enum MemoryBlockType {
272 Vectors,
274 IndexData,
276 Temporary,
278 Results,
280}
281
282#[derive(Debug)]
284pub struct GpuComputeStream {
285 pub stream_id: usize,
287 pub device_id: i32,
289 pub stream_handle: usize,
291 pub priority: i32,
293 pub current_operation: Arc<Mutex<Option<GpuOperation>>>,
295 pub operation_history: Arc<RwLock<VecDeque<CompletedOperation>>>,
297 pub utilization: Arc<RwLock<StreamUtilization>>,
299}
300
301#[derive(Debug)]
303pub struct GpuOperation {
304 pub id: String,
306 pub operation_type: GpuOperationType,
308 pub input_data: GpuOperationData,
310 pub output_size: usize,
312 pub priority: i32,
314 pub timeout: Option<Duration>,
316 pub result_sender: Option<oneshot::Sender<GpuOperationResult>>,
318}
319
320#[derive(Debug, Clone)]
322pub enum GpuOperationType {
323 Search {
325 query_vectors: Vec<Vec<f32>>,
326 k: usize,
327 search_params: FaissSearchParams,
328 },
329 Add {
331 vectors: Vec<Vec<f32>>,
332 ids: Vec<String>,
333 },
334 Train { training_vectors: Vec<Vec<f32>> },
336 Optimize,
338 MemoryTransfer {
340 source: TransferSource,
341 destination: TransferDestination,
342 size: usize,
343 },
344}
345
346#[derive(Debug, Clone)]
348pub enum GpuOperationData {
349 Vectors(Vec<Vec<f32>>),
351 IndexData(Vec<u8>),
353 QueryParams(HashMap<String, Vec<u8>>),
355 Empty,
357}
358
359#[derive(Debug, Clone)]
361pub enum TransferSource {
362 CpuMemory(Vec<u8>),
363 GpuMemory { device_id: i32, address: usize },
364 Disk(std::path::PathBuf),
365}
366
367#[derive(Debug, Clone)]
368pub enum TransferDestination {
369 CpuMemory,
370 GpuMemory { device_id: i32, address: usize },
371 Disk(std::path::PathBuf),
372}
373
374#[derive(Debug, Clone)]
376pub struct GpuOperationResult {
377 pub operation_id: String,
379 pub success: bool,
381 pub result_data: GpuResultData,
383 pub execution_time: Duration,
385 pub memory_used: usize,
387 pub error_message: Option<String>,
389}
390
391#[derive(Debug, Clone)]
393pub enum GpuResultData {
394 SearchResults(Vec<Vec<(String, f32)>>),
396 TrainingComplete,
398 AdditionComplete,
400 OptimizationMetrics(HashMap<String, f64>),
402 TransferComplete,
404 Error(String),
406}
407
408#[derive(Debug, Clone)]
410pub struct CompletedOperation {
411 pub operation_id: String,
413 pub operation_type: String,
415 pub start_time: Instant,
417 pub end_time: Instant,
419 pub success: bool,
421 pub memory_used: usize,
423}
424
425#[derive(Debug, Clone, Default)]
427pub struct StreamUtilization {
428 pub total_operations: usize,
430 pub total_execution_time: Duration,
432 pub avg_execution_time: Duration,
434 pub utilization_percentage: f32,
436 pub idle_time: Duration,
438}
439
440#[derive(Debug, Clone, Default)]
442pub struct GpuPerformanceStats {
443 pub device_stats: HashMap<i32, DeviceStats>,
445 pub overall_utilization: f32,
447 pub memory_efficiency: f32,
449 pub throughput: ThroughputMetrics,
451 pub error_stats: ErrorStatistics,
453 pub performance_trends: PerformanceTrends,
455}
456
457#[derive(Debug, Clone, Default)]
459pub struct DeviceStats {
460 pub utilization: f32,
462 pub memory_usage: MemoryUsageStats,
464 pub compute_performance: ComputePerformanceStats,
466 pub power_consumption: f32,
468 pub temperature: f32,
470}
471
472#[derive(Debug, Clone, Default)]
474pub struct MemoryUsageStats {
475 pub total_memory: usize,
477 pub used_memory: usize,
479 pub free_memory: usize,
481 pub peak_usage: usize,
483 pub fragmentation: f32,
485}
486
487#[derive(Debug, Clone, Default)]
489pub struct ComputePerformanceStats {
490 pub flops: f64,
492 pub memory_bandwidth_utilization: f32,
494 pub kernel_efficiency: f32,
496 pub occupancy: f32,
498}
499
500#[derive(Debug, Clone, Default)]
502pub struct ThroughputMetrics {
503 pub vectors_per_second: f64,
505 pub operations_per_second: f64,
507 pub transfer_rate_mbps: f64,
509 pub search_qps: f64,
511}
512
513#[derive(Debug, Clone, Default)]
515pub struct ErrorStatistics {
516 pub total_errors: usize,
518 pub recoverable_errors: usize,
520 pub fatal_errors: usize,
522 pub error_rate: f32,
524 pub recovery_rate: f32,
526}
527
528#[derive(Debug, Clone, Default)]
530pub struct PerformanceTrends {
531 pub utilization_trend: Vec<(Instant, f32)>,
533 pub throughput_trend: Vec<(Instant, f64)>,
535 pub memory_trend: Vec<(Instant, usize)>,
537 pub error_trend: Vec<(Instant, f32)>,
539}
540
541#[derive(Debug, Clone, Default)]
543pub struct AllocationStatistics {
544 pub total_allocations: usize,
546 pub total_deallocations: usize,
548 pub peak_usage: usize,
550 pub avg_allocation_size: usize,
552 pub fragmentation_events: usize,
554 pub oom_events: usize,
556}
557
558#[derive(Debug)]
560pub struct CachedResult {
561 pub data: GpuResultData,
563 pub timestamp: Instant,
565 pub hit_count: AtomicUsize,
567 pub size: usize,
569}
570
571impl Clone for CachedResult {
572 fn clone(&self) -> Self {
573 Self {
574 data: self.data.clone(),
575 timestamp: self.timestamp,
576 hit_count: AtomicUsize::new(self.hit_count.load(Ordering::Acquire)),
577 size: self.size,
578 }
579 }
580}
581
582#[derive(Debug)]
584pub struct GpuLoadBalancer {
585 pub device_utilization: HashMap<i32, f32>,
587 pub workload_distribution: HashMap<i32, usize>,
589 pub strategy: LoadBalancingStrategy,
591 pub performance_history: HashMap<i32, VecDeque<PerformanceSnapshot>>,
593}
594
595#[derive(Debug, Clone)]
597pub enum LoadBalancingStrategy {
598 RoundRobin,
600 LoadBased,
602 PerformanceBased,
604 MemoryAware,
606 Hybrid,
608}
609
610#[derive(Debug, Clone)]
612pub struct PerformanceSnapshot {
613 pub timestamp: Instant,
615 pub utilization: f32,
617 pub memory_usage: f32,
619 pub ops_per_second: f64,
621 pub avg_latency: Duration,
623}
624
625impl FaissGpuIndex {
626 pub async fn new(faiss_config: FaissConfig, gpu_config: FaissGpuConfig) -> Result<Self> {
628 let span = span!(Level::INFO, "faiss_gpu_index_new");
629 let _enter = span.enter();
630
631 let _base_gpu_config = crate::gpu::GpuConfig {
633 device_id: gpu_config.device_ids.first().copied().unwrap_or(0),
634 enable_mixed_precision: true,
635 enable_tensor_cores: true,
636 batch_size: 1024,
637 memory_pool_size: gpu_config.memory_per_device,
638 stream_count: gpu_config.stream_config.streams_per_device,
639 enable_peer_access: gpu_config.enable_multi_gpu,
640 enable_unified_memory: matches!(gpu_config.memory_strategy, GpuMemoryStrategy::Unified),
641 enable_async_execution: true,
642 enable_multi_gpu: gpu_config.enable_multi_gpu,
643 preferred_gpu_ids: gpu_config.device_ids.clone(),
644 dynamic_batch_sizing: true,
645 enable_memory_compression: false,
646 kernel_cache_size: 1024 * 1024,
647 optimization_level: crate::gpu::OptimizationLevel::Performance,
648 precision_mode: crate::gpu::PrecisionMode::Mixed,
649 };
650
651 let gpu_runtime = Arc::new(GpuExecutionConfig::default());
653
654 let mut memory_pools = HashMap::new();
656 for &device_id in &gpu_config.device_ids {
657 let pool = FaissGpuMemoryPool::new(device_id, gpu_config.memory_per_device)?;
658 memory_pools.insert(device_id, pool);
659 }
660
661 let mut compute_streams = HashMap::new();
663 for &device_id in &gpu_config.device_ids {
664 let streams = Self::create_compute_streams(device_id, &gpu_config.stream_config)?;
665 compute_streams.insert(device_id, streams);
666 }
667
668 let load_balancer =
670 GpuLoadBalancer::new(&gpu_config.device_ids, LoadBalancingStrategy::Hybrid);
671
672 let device_count = gpu_config.device_ids.len();
673 let index = Self {
674 faiss_config,
675 gpu_config,
676 gpu_runtime,
677 memory_pools: Arc::new(RwLock::new(memory_pools)),
678 compute_streams: Arc::new(RwLock::new(compute_streams)),
679 stats: Arc::new(RwLock::new(GpuPerformanceStats::default())),
680 work_queue: Arc::new(Mutex::new(VecDeque::new())),
681 results_cache: Arc::new(RwLock::new(HashMap::new())),
682 load_balancer: Arc::new(RwLock::new(load_balancer)),
683 };
684
685 index.start_background_workers().await?;
687
688 info!(
689 "Created GPU-accelerated FAISS index with {} devices",
690 device_count
691 );
692 Ok(index)
693 }
694
695 fn create_compute_streams(
697 device_id: i32,
698 stream_config: &GpuStreamConfig,
699 ) -> Result<Vec<GpuComputeStream>> {
700 let mut streams = Vec::new();
701
702 for i in 0..stream_config.streams_per_device {
703 let priority = stream_config
704 .priority_levels
705 .get(i % stream_config.priority_levels.len())
706 .copied()
707 .unwrap_or(0);
708
709 let stream = GpuComputeStream {
710 stream_id: i,
711 device_id,
712 stream_handle: device_id as usize * 1000 + i, priority,
714 current_operation: Arc::new(Mutex::new(None)),
715 operation_history: Arc::new(RwLock::new(VecDeque::new())),
716 utilization: Arc::new(RwLock::new(StreamUtilization::default())),
717 };
718
719 streams.push(stream);
720 }
721
722 Ok(streams)
723 }
724
725 async fn start_background_workers(&self) -> Result<()> {
727 let span = span!(Level::DEBUG, "start_background_workers");
728 let _enter = span.enter();
729
730 self.start_operation_processor().await?;
732
733 self.start_performance_monitor().await?;
735
736 self.start_memory_manager().await?;
738
739 if self.gpu_config.enable_multi_gpu {
741 self.start_load_balancer().await?;
742 }
743
744 debug!("Started background worker tasks");
745 Ok(())
746 }
747
748 async fn start_operation_processor(&self) -> Result<()> {
750 let work_queue = Arc::clone(&self.work_queue);
751 let compute_streams = Arc::clone(&self.compute_streams);
752 let stats = Arc::clone(&self.stats);
753 let gpu_config = self.gpu_config.clone();
754
755 tokio::spawn(async move {
756 loop {
757 if let Some(operation) = {
759 let mut queue = work_queue.lock().unwrap();
760 queue.pop_front()
761 } {
762 if let Err(e) = Self::process_gpu_operation(
763 operation,
764 &compute_streams,
765 &stats,
766 &gpu_config,
767 )
768 .await
769 {
770 error!("Failed to process GPU operation: {}", e);
771 }
772 }
773
774 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
776 }
777 });
778
779 Ok(())
780 }
781
782 async fn process_gpu_operation(
784 mut operation: GpuOperation,
785 compute_streams: &Arc<RwLock<HashMap<i32, Vec<GpuComputeStream>>>>,
786 stats: &Arc<RwLock<GpuPerformanceStats>>,
787 gpu_config: &FaissGpuConfig,
788 ) -> Result<()> {
789 let start_time = Instant::now();
790
791 let (device_id, stream_id) =
793 Self::select_optimal_stream(compute_streams, &operation).await?;
794
795 let result_sender = operation.result_sender.take();
797
798 let result =
800 Self::execute_operation_on_device(operation, device_id, stream_id, gpu_config).await?;
801
802 if let Some(sender) = result_sender {
804 let _ = sender.send(result.clone());
805 }
806
807 Self::update_operation_stats(stats, &result, start_time.elapsed()).await?;
809
810 Ok(())
811 }
812
813 async fn select_optimal_stream(
815 compute_streams: &Arc<RwLock<HashMap<i32, Vec<GpuComputeStream>>>>,
816 _operation: &GpuOperation,
817 ) -> Result<(i32, usize)> {
818 let streams = compute_streams.read().unwrap();
819
820 let mut best_device = 0;
822 let mut best_stream = 0;
823 let mut lowest_utilization = f32::MAX;
824
825 for (&device_id, device_streams) in streams.iter() {
826 for (stream_id, stream) in device_streams.iter().enumerate() {
827 let utilization = stream.utilization.read().unwrap().utilization_percentage;
828 if utilization < lowest_utilization {
829 lowest_utilization = utilization;
830 best_device = device_id;
831 best_stream = stream_id;
832 }
833 }
834 }
835
836 Ok((best_device, best_stream))
837 }
838
839 async fn execute_operation_on_device(
841 operation: GpuOperation,
842 _device_id: i32,
843 _stream_id: usize,
844 _gpu_config: &FaissGpuConfig,
845 ) -> Result<GpuOperationResult> {
846 let start_time = Instant::now();
847
848 let result_data = match &operation.operation_type {
850 GpuOperationType::Search {
851 query_vectors, k, ..
852 } => {
853 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
855
856 let mut results = Vec::new();
857 for _query in query_vectors {
858 let mut query_results = Vec::new();
859 for i in 0..*k {
860 query_results.push((format!("gpu_result_{i}"), 0.95 - (i as f32 * 0.05)));
861 }
862 results.push(query_results);
863 }
864
865 GpuResultData::SearchResults(results)
866 }
867 GpuOperationType::Add { vectors: _, .. } => {
868 tokio::time::sleep(tokio::time::Duration::from_millis(5)).await;
870 GpuResultData::AdditionComplete
871 }
872 GpuOperationType::Train { .. } => {
873 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
875 GpuResultData::TrainingComplete
876 }
877 GpuOperationType::Optimize => {
878 tokio::time::sleep(tokio::time::Duration::from_millis(20)).await;
880 let mut metrics = HashMap::new();
881 metrics.insert("optimization_improvement".to_string(), 15.0);
882 metrics.insert("memory_efficiency".to_string(), 92.0);
883 GpuResultData::OptimizationMetrics(metrics)
884 }
885 GpuOperationType::MemoryTransfer { size, .. } => {
886 let transfer_time = *size as f64 / (10.0 * 1024.0 * 1024.0 * 1024.0); tokio::time::sleep(tokio::time::Duration::from_secs_f64(transfer_time)).await;
889 GpuResultData::TransferComplete
890 }
891 };
892
893 Ok(GpuOperationResult {
894 operation_id: operation.id,
895 success: true,
896 result_data,
897 execution_time: start_time.elapsed(),
898 memory_used: 1024 * 1024, error_message: None,
900 })
901 }
902
903 async fn update_operation_stats(
905 stats: &Arc<RwLock<GpuPerformanceStats>>,
906 result: &GpuOperationResult,
907 execution_time: Duration,
908 ) -> Result<()> {
909 let mut stats = stats.write().unwrap();
910
911 stats.throughput.operations_per_second += 1.0 / execution_time.as_secs_f64();
913
914 if !result.success {
916 stats.error_stats.total_errors += 1;
917 }
918
919 Ok(())
920 }
921
922 async fn start_performance_monitor(&self) -> Result<()> {
924 let stats = Arc::clone(&self.stats);
925 let device_ids = self.gpu_config.device_ids.clone();
926
927 tokio::spawn(async move {
928 let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(1));
929
930 loop {
931 interval.tick().await;
932
933 if let Err(e) = Self::collect_performance_metrics(&stats, &device_ids).await {
935 warn!("Failed to collect performance metrics: {}", e);
936 }
937 }
938 });
939
940 Ok(())
941 }
942
943 async fn collect_performance_metrics(
945 stats: &Arc<RwLock<GpuPerformanceStats>>,
946 device_ids: &[i32],
947 ) -> Result<()> {
948 let mut stats = stats.write().unwrap();
949
950 for &device_id in device_ids {
951 let device_stats = DeviceStats {
953 utilization: 75.0 + (device_id as f32 * 5.0) % 25.0, memory_usage: MemoryUsageStats {
955 total_memory: 8 * 1024 * 1024 * 1024, used_memory: 6 * 1024 * 1024 * 1024, free_memory: 2 * 1024 * 1024 * 1024, peak_usage: 7 * 1024 * 1024 * 1024, fragmentation: 5.0,
960 },
961 compute_performance: ComputePerformanceStats {
962 flops: 15.5e12, memory_bandwidth_utilization: 80.0,
964 kernel_efficiency: 85.0,
965 occupancy: 75.0,
966 },
967 power_consumption: 250.0, temperature: 70.0, };
970
971 stats.device_stats.insert(device_id, device_stats);
972 }
973
974 stats.overall_utilization = stats
976 .device_stats
977 .values()
978 .map(|s| s.utilization)
979 .sum::<f32>()
980 / stats.device_stats.len() as f32;
981
982 Ok(())
983 }
984
985 async fn start_memory_manager(&self) -> Result<()> {
987 let memory_pools = Arc::clone(&self.memory_pools);
988 let gpu_config = self.gpu_config.clone();
989
990 tokio::spawn(async move {
991 let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(5));
992
993 loop {
994 interval.tick().await;
995
996 if let Err(e) = Self::manage_gpu_memory(&memory_pools, &gpu_config).await {
998 warn!("Failed to manage GPU memory: {}", e);
999 }
1000 }
1001 });
1002
1003 Ok(())
1004 }
1005
1006 async fn manage_gpu_memory(
1008 memory_pools: &Arc<RwLock<HashMap<i32, FaissGpuMemoryPool>>>,
1009 _gpu_config: &FaissGpuConfig,
1010 ) -> Result<()> {
1011 let pools = memory_pools.read().unwrap();
1012
1013 for (device_id, pool) in pools.iter() {
1014 let fragmentation = pool.calculate_fragmentation();
1016 if fragmentation > 20.0 {
1017 debug!(
1018 "High fragmentation detected on device {}: {:.1}%",
1019 device_id, fragmentation
1020 );
1021 }
1023
1024 let allocated_blocks = pool.allocated_blocks.read().unwrap();
1026 let now = Instant::now();
1027 for (_, block) in allocated_blocks.iter() {
1028 if now.duration_since(block.allocated_at) > Duration::from_secs(3600) {
1029 warn!(
1030 "Potential memory leak detected on device {}: block allocated {} ago",
1031 device_id,
1032 humantime::format_duration(now.duration_since(block.allocated_at))
1033 );
1034 }
1035 }
1036 }
1037
1038 Ok(())
1039 }
1040
1041 async fn start_load_balancer(&self) -> Result<()> {
1043 let load_balancer = Arc::clone(&self.load_balancer);
1044 let stats = Arc::clone(&self.stats);
1045
1046 tokio::spawn(async move {
1047 let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(2));
1048
1049 loop {
1050 interval.tick().await;
1051
1052 if let Err(e) = Self::update_load_balancing(&load_balancer, &stats).await {
1054 warn!("Failed to update load balancing: {}", e);
1055 }
1056 }
1057 });
1058
1059 Ok(())
1060 }
1061
1062 async fn update_load_balancing(
1064 load_balancer: &Arc<RwLock<GpuLoadBalancer>>,
1065 stats: &Arc<RwLock<GpuPerformanceStats>>,
1066 ) -> Result<()> {
1067 let stats = stats.read().unwrap();
1068 let mut balancer = load_balancer.write().unwrap();
1069
1070 for (&device_id, device_stats) in &stats.device_stats {
1072 balancer
1073 .device_utilization
1074 .insert(device_id, device_stats.utilization);
1075
1076 let snapshot = PerformanceSnapshot {
1078 timestamp: Instant::now(),
1079 utilization: device_stats.utilization,
1080 memory_usage: device_stats.memory_usage.used_memory as f32
1081 / device_stats.memory_usage.total_memory as f32
1082 * 100.0,
1083 ops_per_second: 1000.0, avg_latency: Duration::from_micros(250),
1085 };
1086
1087 balancer
1088 .performance_history
1089 .entry(device_id)
1090 .or_default()
1091 .push_back(snapshot);
1092
1093 if balancer.performance_history[&device_id].len() > 100 {
1095 balancer
1096 .performance_history
1097 .get_mut(&device_id)
1098 .unwrap()
1099 .pop_front();
1100 }
1101 }
1102
1103 Ok(())
1104 }
1105
1106 pub async fn search_gpu(
1108 &self,
1109 query_vectors: Vec<Vec<f32>>,
1110 k: usize,
1111 search_params: FaissSearchParams,
1112 ) -> Result<Vec<Vec<(String, f32)>>> {
1113 let span = span!(Level::DEBUG, "search_gpu");
1114 let _enter = span.enter();
1115
1116 let (result_sender, result_receiver) = oneshot::channel();
1118 let operation = GpuOperation {
1119 id: uuid::Uuid::new_v4().to_string(),
1120 operation_type: GpuOperationType::Search {
1121 query_vectors: query_vectors.clone(),
1122 k,
1123 search_params,
1124 },
1125 input_data: GpuOperationData::Vectors(query_vectors),
1126 output_size: k * std::mem::size_of::<(String, f32)>(),
1127 priority: 1,
1128 timeout: Some(Duration::from_secs(30)),
1129 result_sender: Some(result_sender),
1130 };
1131
1132 {
1134 let mut queue = self.work_queue.lock().unwrap();
1135 queue.push_back(operation);
1136 }
1137
1138 let result = result_receiver
1140 .await
1141 .map_err(|_| AnyhowError::msg("GPU operation timeout"))?;
1142
1143 if !result.success {
1144 return Err(AnyhowError::msg(
1145 result
1146 .error_message
1147 .unwrap_or_else(|| "GPU operation failed".to_string()),
1148 ));
1149 }
1150
1151 match result.result_data {
1152 GpuResultData::SearchResults(results) => Ok(results),
1153 _ => Err(AnyhowError::msg("Unexpected result type")),
1154 }
1155 }
1156
1157 pub async fn add_vectors_gpu(&self, vectors: Vec<Vec<f32>>, ids: Vec<String>) -> Result<()> {
1159 let span = span!(Level::DEBUG, "add_vectors_gpu");
1160 let _enter = span.enter();
1161
1162 let (result_sender, result_receiver) = oneshot::channel();
1163 let operation = GpuOperation {
1164 id: uuid::Uuid::new_v4().to_string(),
1165 operation_type: GpuOperationType::Add {
1166 vectors: vectors.clone(),
1167 ids,
1168 },
1169 input_data: GpuOperationData::Vectors(vectors),
1170 output_size: 0,
1171 priority: 2,
1172 timeout: Some(Duration::from_secs(60)),
1173 result_sender: Some(result_sender),
1174 };
1175
1176 {
1177 let mut queue = self.work_queue.lock().unwrap();
1178 queue.push_back(operation);
1179 }
1180
1181 let result = result_receiver
1182 .await
1183 .map_err(|_| AnyhowError::msg("GPU operation timeout"))?;
1184
1185 if !result.success {
1186 return Err(AnyhowError::msg(
1187 result
1188 .error_message
1189 .unwrap_or_else(|| "GPU operation failed".to_string()),
1190 ));
1191 }
1192
1193 Ok(())
1194 }
1195
1196 pub fn get_gpu_stats(&self) -> Result<GpuPerformanceStats> {
1198 let stats = self.stats.read().unwrap();
1199 Ok(stats.clone())
1200 }
1201
1202 pub async fn optimize_gpu_performance(&self) -> Result<HashMap<String, f64>> {
1204 let span = span!(Level::INFO, "optimize_gpu_performance");
1205 let _enter = span.enter();
1206
1207 let (result_sender, result_receiver) = oneshot::channel();
1208 let operation = GpuOperation {
1209 id: uuid::Uuid::new_v4().to_string(),
1210 operation_type: GpuOperationType::Optimize,
1211 input_data: GpuOperationData::Empty,
1212 output_size: 0,
1213 priority: 0, timeout: Some(Duration::from_secs(120)),
1215 result_sender: Some(result_sender),
1216 };
1217
1218 {
1219 let mut queue = self.work_queue.lock().unwrap();
1220 queue.push_back(operation);
1221 }
1222
1223 let result = result_receiver
1224 .await
1225 .map_err(|_| AnyhowError::msg("GPU optimization timeout"))?;
1226
1227 if !result.success {
1228 return Err(AnyhowError::msg("GPU optimization failed"));
1229 }
1230
1231 match result.result_data {
1232 GpuResultData::OptimizationMetrics(metrics) => Ok(metrics),
1233 _ => Err(AnyhowError::msg("Unexpected result type")),
1234 }
1235 }
1236}
1237
1238impl FaissGpuMemoryPool {
1239 pub fn new(device_id: i32, total_size: usize) -> Result<Self> {
1241 Ok(Self {
1242 device_id,
1243 total_size,
1244 allocated_size: AtomicUsize::new(0),
1245 free_blocks: Arc::new(Mutex::new(BTreeMap::new())),
1246 allocated_blocks: Arc::new(RwLock::new(HashMap::new())),
1247 allocation_stats: Arc::new(RwLock::new(AllocationStatistics::default())),
1248 })
1249 }
1250
1251 pub fn allocate(&self, size: usize, block_type: MemoryBlockType) -> Result<GpuMemoryBlock> {
1253 let aligned_size = (size + 255) & !255; if self.allocated_size.load(Ordering::Relaxed) + aligned_size > self.total_size {
1256 return Err(AnyhowError::msg("Out of GPU memory"));
1257 }
1258
1259 let block = GpuMemoryBlock {
1260 gpu_address: self.allocated_size.load(Ordering::Relaxed), size: aligned_size,
1262 allocated_at: Instant::now(),
1263 ref_count: AtomicUsize::new(1),
1264 block_type,
1265 };
1266
1267 self.allocated_size
1268 .fetch_add(aligned_size, Ordering::Relaxed);
1269
1270 {
1272 let mut stats = self.allocation_stats.write().unwrap();
1273 stats.total_allocations += 1;
1274 let current_usage = self.allocated_size.load(Ordering::Relaxed);
1275 if current_usage > stats.peak_usage {
1276 stats.peak_usage = current_usage;
1277 }
1278 }
1279
1280 Ok(block)
1281 }
1282
1283 pub fn deallocate(&self, block: &GpuMemoryBlock) -> Result<()> {
1285 self.allocated_size.fetch_sub(block.size, Ordering::Relaxed);
1286
1287 {
1288 let mut stats = self.allocation_stats.write().unwrap();
1289 stats.total_deallocations += 1;
1290 }
1291
1292 Ok(())
1293 }
1294
1295 pub fn calculate_fragmentation(&self) -> f32 {
1297 let allocated = self.allocated_size.load(Ordering::Relaxed);
1299 let free_blocks = self.free_blocks.lock().unwrap();
1300 let num_free_blocks = free_blocks.len();
1301
1302 if allocated == 0 {
1303 return 0.0;
1304 }
1305
1306 (num_free_blocks as f32 / (allocated / 1024) as f32) * 100.0
1307 }
1308}
1309
1310impl GpuLoadBalancer {
1311 pub fn new(device_ids: &[i32], strategy: LoadBalancingStrategy) -> Self {
1313 let mut device_utilization = HashMap::new();
1314 let mut workload_distribution = HashMap::new();
1315 let mut performance_history = HashMap::new();
1316
1317 for &device_id in device_ids {
1318 device_utilization.insert(device_id, 0.0);
1319 workload_distribution.insert(device_id, 0);
1320 performance_history.insert(device_id, VecDeque::new());
1321 }
1322
1323 Self {
1324 device_utilization,
1325 workload_distribution,
1326 strategy,
1327 performance_history,
1328 }
1329 }
1330
1331 pub fn select_device(&self, operation: &GpuOperation) -> i32 {
1333 match self.strategy {
1334 LoadBalancingStrategy::RoundRobin => self.select_round_robin(),
1335 LoadBalancingStrategy::LoadBased => self.select_load_based(),
1336 LoadBalancingStrategy::PerformanceBased => self.select_performance_based(),
1337 LoadBalancingStrategy::MemoryAware => self.select_memory_aware(),
1338 LoadBalancingStrategy::Hybrid => self.select_hybrid(operation),
1339 }
1340 }
1341
1342 fn select_round_robin(&self) -> i32 {
1343 let total_workload: usize = self.workload_distribution.values().sum();
1345 let device_count = self.device_utilization.len();
1346 let target_device_index = total_workload % device_count;
1347
1348 *self
1349 .device_utilization
1350 .keys()
1351 .nth(target_device_index)
1352 .unwrap_or(&0)
1353 }
1354
1355 fn select_load_based(&self) -> i32 {
1356 self.device_utilization
1358 .iter()
1359 .min_by(|a, b| a.1.partial_cmp(b.1).unwrap_or(std::cmp::Ordering::Equal))
1360 .map(|(&device_id, _)| device_id)
1361 .unwrap_or(0)
1362 }
1363
1364 fn select_performance_based(&self) -> i32 {
1365 let mut best_device = 0;
1367 let mut best_score = f64::MIN;
1368
1369 for (&device_id, history) in &self.performance_history {
1370 if let Some(recent_snapshot) = history.back() {
1371 let score = recent_snapshot.ops_per_second
1372 / (recent_snapshot.avg_latency.as_secs_f64() + 1e-6);
1373 if score > best_score {
1374 best_score = score;
1375 best_device = device_id;
1376 }
1377 }
1378 }
1379
1380 best_device
1381 }
1382
1383 fn select_memory_aware(&self) -> i32 {
1384 self.device_utilization
1386 .iter()
1387 .min_by(|a, b| a.1.partial_cmp(b.1).unwrap_or(std::cmp::Ordering::Equal))
1388 .map(|(&device_id, _)| device_id)
1389 .unwrap_or(0)
1390 }
1391
1392 fn select_hybrid(&self, operation: &GpuOperation) -> i32 {
1393 match &operation.operation_type {
1395 GpuOperationType::Search { .. } => self.select_performance_based(),
1396 GpuOperationType::Add { .. } => self.select_memory_aware(),
1397 GpuOperationType::Train { .. } => self.select_load_based(),
1398 _ => self.select_round_robin(),
1399 }
1400 }
1401}
1402
1403#[cfg(test)]
1404mod tests {
1405 use super::*;
1406
1407 #[tokio::test]
1408 async fn test_faiss_gpu_index_creation() {
1409 let faiss_config = FaissConfig::default();
1410 let gpu_config = FaissGpuConfig::default();
1411
1412 let result = FaissGpuIndex::new(faiss_config, gpu_config).await;
1413 assert!(result.is_ok());
1414 }
1415
1416 #[test]
1417 fn test_gpu_memory_pool() {
1418 let pool = FaissGpuMemoryPool::new(0, 1024 * 1024).unwrap(); let block = pool.allocate(1024, MemoryBlockType::Vectors).unwrap();
1421 assert_eq!(block.size, 1024);
1422
1423 pool.deallocate(&block).unwrap();
1424 assert_eq!(pool.allocated_size.load(Ordering::Relaxed), 0);
1425 }
1426
1427 #[test]
1428 fn test_gpu_load_balancer() {
1429 let device_ids = vec![0, 1, 2];
1430 let balancer = GpuLoadBalancer::new(&device_ids, LoadBalancingStrategy::RoundRobin);
1431
1432 assert_eq!(balancer.device_utilization.len(), 3);
1433
1434 let operation = GpuOperation {
1435 id: "test".to_string(),
1436 operation_type: GpuOperationType::Optimize,
1437 input_data: GpuOperationData::Empty,
1438 output_size: 0,
1439 priority: 0,
1440 timeout: None,
1441 result_sender: None,
1442 };
1443
1444 let selected_device = balancer.select_device(&operation);
1445 assert!(device_ids.contains(&selected_device));
1446 }
1447}