1use crate::{
15 faiss_integration::{FaissConfig, FaissSearchParams},
16 gpu::GpuExecutionConfig,
17};
18use anyhow::{Error as AnyhowError, Result};
19use serde::{Deserialize, Serialize};
20use std::collections::{BTreeMap, HashMap, VecDeque};
21use std::sync::{
22 atomic::{AtomicUsize, Ordering},
23 Arc, Mutex, RwLock,
24};
25use std::time::{Duration, Instant};
26use tokio::sync::oneshot;
27use tracing::{debug, error, info, span, warn, Level};
28
29#[derive(Debug, Clone, Serialize, Deserialize)]
31pub struct FaissGpuConfig {
32 pub device_ids: Vec<i32>,
34 pub memory_per_device: usize,
36 pub enable_multi_gpu: bool,
38 pub memory_strategy: GpuMemoryStrategy,
40 pub stream_config: GpuStreamConfig,
42 pub optimization: GpuOptimizationConfig,
44 pub error_handling: GpuErrorConfig,
46}
47
48#[derive(Debug, Clone, Serialize, Deserialize)]
50pub enum GpuMemoryStrategy {
51 FixedPool,
53 Dynamic,
55 Unified,
57 Streaming { chunk_size: usize },
59}
60
61#[derive(Debug, Clone, Serialize, Deserialize)]
63pub struct GpuStreamConfig {
64 pub streams_per_device: usize,
66 pub enable_overlapping: bool,
68 pub priority_levels: Vec<i32>,
70 pub sync_strategy: SyncStrategy,
72}
73
74#[derive(Debug, Clone, Serialize, Deserialize)]
76pub enum SyncStrategy {
77 Blocking,
79 NonBlocking,
81 EventBased,
83 Cooperative,
85}
86
87#[derive(Debug, Clone, Serialize, Deserialize)]
89pub struct GpuOptimizationConfig {
90 pub enable_tensor_cores: bool,
92 pub enable_mixed_precision: bool,
94 pub enable_coalescing: bool,
96 pub enable_kernel_fusion: bool,
98 pub cache_config: GpuCacheConfig,
100 pub batch_optimization: BatchOptimizationConfig,
102}
103
104#[derive(Debug, Clone, Serialize, Deserialize)]
106pub struct GpuCacheConfig {
107 pub l1_cache_config: CacheConfig,
109 pub shared_memory_config: CacheConfig,
111 pub enable_prefetching: bool,
113 pub cache_line_size: usize,
115}
116
117#[derive(Debug, Clone, Serialize, Deserialize)]
119pub enum CacheConfig {
120 PreferL1,
122 PreferShared,
124 Equal,
126 Disabled,
128}
129
130#[derive(Debug, Clone, Serialize, Deserialize)]
132pub struct BatchOptimizationConfig {
133 pub optimal_batch_sizes: HashMap<String, usize>,
135 pub enable_dynamic_batching: bool,
137 pub coalescence_threshold: usize,
139 pub max_batch_size: usize,
141}
142
143#[derive(Debug, Clone, Serialize, Deserialize)]
145pub struct GpuErrorConfig {
146 pub enable_auto_recovery: bool,
148 pub max_retries: usize,
150 pub fallback_to_cpu: bool,
152 pub error_logging_level: String,
154}
155
156impl Default for FaissGpuConfig {
157 fn default() -> Self {
158 Self {
159 device_ids: vec![0],
160 memory_per_device: 2 * 1024 * 1024 * 1024, enable_multi_gpu: false,
162 memory_strategy: GpuMemoryStrategy::Dynamic,
163 stream_config: GpuStreamConfig {
164 streams_per_device: 4,
165 enable_overlapping: true,
166 priority_levels: vec![0, 1, 2],
167 sync_strategy: SyncStrategy::NonBlocking,
168 },
169 optimization: GpuOptimizationConfig {
170 enable_tensor_cores: true,
171 enable_mixed_precision: true,
172 enable_coalescing: true,
173 enable_kernel_fusion: true,
174 cache_config: GpuCacheConfig {
175 l1_cache_config: CacheConfig::PreferL1,
176 shared_memory_config: CacheConfig::PreferShared,
177 enable_prefetching: true,
178 cache_line_size: 128,
179 },
180 batch_optimization: BatchOptimizationConfig {
181 optimal_batch_sizes: {
182 let mut sizes = HashMap::new();
183 sizes.insert("search".to_string(), 1024);
184 sizes.insert("add".to_string(), 512);
185 sizes.insert("train".to_string(), 256);
186 sizes
187 },
188 enable_dynamic_batching: true,
189 coalescence_threshold: 64,
190 max_batch_size: 4096,
191 },
192 },
193 error_handling: GpuErrorConfig {
194 enable_auto_recovery: true,
195 max_retries: 3,
196 fallback_to_cpu: true,
197 error_logging_level: "warn".to_string(),
198 },
199 }
200 }
201}
202
203pub struct FaissGpuIndex {
205 faiss_config: FaissConfig,
207 gpu_config: FaissGpuConfig,
209 gpu_runtime: Arc<GpuExecutionConfig>,
211 memory_pools: Arc<RwLock<HashMap<i32, FaissGpuMemoryPool>>>,
213 compute_streams: Arc<RwLock<HashMap<i32, Vec<GpuComputeStream>>>>,
215 stats: Arc<RwLock<GpuPerformanceStats>>,
217 work_queue: Arc<Mutex<VecDeque<GpuOperation>>>,
219 results_cache: Arc<RwLock<HashMap<String, CachedResult>>>,
221 load_balancer: Arc<RwLock<GpuLoadBalancer>>,
223}
224
225#[derive(Debug)]
227pub struct FaissGpuMemoryPool {
228 pub device_id: i32,
230 pub total_size: usize,
232 pub allocated_size: AtomicUsize,
234 pub free_blocks: Arc<Mutex<BTreeMap<usize, Vec<GpuMemoryBlock>>>>,
236 pub allocated_blocks: Arc<RwLock<HashMap<usize, GpuMemoryBlock>>>,
238 pub allocation_stats: Arc<RwLock<AllocationStatistics>>,
240}
241
242#[derive(Debug)]
244pub struct GpuMemoryBlock {
245 pub gpu_address: usize,
247 pub size: usize,
249 pub allocated_at: Instant,
251 pub ref_count: AtomicUsize,
253 pub block_type: MemoryBlockType,
255}
256
257impl Clone for GpuMemoryBlock {
258 fn clone(&self) -> Self {
259 Self {
260 gpu_address: self.gpu_address,
261 size: self.size,
262 allocated_at: self.allocated_at,
263 ref_count: AtomicUsize::new(self.ref_count.load(Ordering::Relaxed)),
264 block_type: self.block_type,
265 }
266 }
267}
268
269#[derive(Debug, Clone, Copy)]
271pub enum MemoryBlockType {
272 Vectors,
274 IndexData,
276 Temporary,
278 Results,
280}
281
282#[derive(Debug)]
284pub struct GpuComputeStream {
285 pub stream_id: usize,
287 pub device_id: i32,
289 pub stream_handle: usize,
291 pub priority: i32,
293 pub current_operation: Arc<Mutex<Option<GpuOperation>>>,
295 pub operation_history: Arc<RwLock<VecDeque<CompletedOperation>>>,
297 pub utilization: Arc<RwLock<StreamUtilization>>,
299}
300
301#[derive(Debug)]
303pub struct GpuOperation {
304 pub id: String,
306 pub operation_type: GpuOperationType,
308 pub input_data: GpuOperationData,
310 pub output_size: usize,
312 pub priority: i32,
314 pub timeout: Option<Duration>,
316 pub result_sender: Option<oneshot::Sender<GpuOperationResult>>,
318}
319
320#[derive(Debug, Clone)]
322pub enum GpuOperationType {
323 Search {
325 query_vectors: Vec<Vec<f32>>,
326 k: usize,
327 search_params: FaissSearchParams,
328 },
329 Add {
331 vectors: Vec<Vec<f32>>,
332 ids: Vec<String>,
333 },
334 Train { training_vectors: Vec<Vec<f32>> },
336 Optimize,
338 MemoryTransfer {
340 source: TransferSource,
341 destination: TransferDestination,
342 size: usize,
343 },
344}
345
346#[derive(Debug, Clone)]
348pub enum GpuOperationData {
349 Vectors(Vec<Vec<f32>>),
351 IndexData(Vec<u8>),
353 QueryParams(HashMap<String, Vec<u8>>),
355 Empty,
357}
358
359#[derive(Debug, Clone)]
361pub enum TransferSource {
362 CpuMemory(Vec<u8>),
363 GpuMemory { device_id: i32, address: usize },
364 Disk(std::path::PathBuf),
365}
366
367#[derive(Debug, Clone)]
368pub enum TransferDestination {
369 CpuMemory,
370 GpuMemory { device_id: i32, address: usize },
371 Disk(std::path::PathBuf),
372}
373
374#[derive(Debug, Clone)]
376pub struct GpuOperationResult {
377 pub operation_id: String,
379 pub success: bool,
381 pub result_data: GpuResultData,
383 pub execution_time: Duration,
385 pub memory_used: usize,
387 pub error_message: Option<String>,
389}
390
391#[derive(Debug, Clone)]
393pub enum GpuResultData {
394 SearchResults(Vec<Vec<(String, f32)>>),
396 TrainingComplete,
398 AdditionComplete,
400 OptimizationMetrics(HashMap<String, f64>),
402 TransferComplete,
404 Error(String),
406}
407
408#[derive(Debug, Clone)]
410pub struct CompletedOperation {
411 pub operation_id: String,
413 pub operation_type: String,
415 pub start_time: Instant,
417 pub end_time: Instant,
419 pub success: bool,
421 pub memory_used: usize,
423}
424
425#[derive(Debug, Clone, Default)]
427pub struct StreamUtilization {
428 pub total_operations: usize,
430 pub total_execution_time: Duration,
432 pub avg_execution_time: Duration,
434 pub utilization_percentage: f32,
436 pub idle_time: Duration,
438}
439
440#[derive(Debug, Clone, Default)]
442pub struct GpuPerformanceStats {
443 pub device_stats: HashMap<i32, DeviceStats>,
445 pub overall_utilization: f32,
447 pub memory_efficiency: f32,
449 pub throughput: ThroughputMetrics,
451 pub error_stats: ErrorStatistics,
453 pub performance_trends: PerformanceTrends,
455}
456
457#[derive(Debug, Clone, Default)]
459pub struct DeviceStats {
460 pub utilization: f32,
462 pub memory_usage: MemoryUsageStats,
464 pub compute_performance: ComputePerformanceStats,
466 pub power_consumption: f32,
468 pub temperature: f32,
470}
471
472#[derive(Debug, Clone, Default)]
474pub struct MemoryUsageStats {
475 pub total_memory: usize,
477 pub used_memory: usize,
479 pub free_memory: usize,
481 pub peak_usage: usize,
483 pub fragmentation: f32,
485}
486
487#[derive(Debug, Clone, Default)]
489pub struct ComputePerformanceStats {
490 pub flops: f64,
492 pub memory_bandwidth_utilization: f32,
494 pub kernel_efficiency: f32,
496 pub occupancy: f32,
498}
499
500#[derive(Debug, Clone, Default)]
502pub struct ThroughputMetrics {
503 pub vectors_per_second: f64,
505 pub operations_per_second: f64,
507 pub transfer_rate_mbps: f64,
509 pub search_qps: f64,
511}
512
513#[derive(Debug, Clone, Default)]
515pub struct ErrorStatistics {
516 pub total_errors: usize,
518 pub recoverable_errors: usize,
520 pub fatal_errors: usize,
522 pub error_rate: f32,
524 pub recovery_rate: f32,
526}
527
528#[derive(Debug, Clone, Default)]
530pub struct PerformanceTrends {
531 pub utilization_trend: Vec<(Instant, f32)>,
533 pub throughput_trend: Vec<(Instant, f64)>,
535 pub memory_trend: Vec<(Instant, usize)>,
537 pub error_trend: Vec<(Instant, f32)>,
539}
540
541#[derive(Debug, Clone, Default)]
543pub struct AllocationStatistics {
544 pub total_allocations: usize,
546 pub total_deallocations: usize,
548 pub peak_usage: usize,
550 pub avg_allocation_size: usize,
552 pub fragmentation_events: usize,
554 pub oom_events: usize,
556}
557
558#[derive(Debug)]
560pub struct CachedResult {
561 pub data: GpuResultData,
563 pub timestamp: Instant,
565 pub hit_count: AtomicUsize,
567 pub size: usize,
569}
570
571impl Clone for CachedResult {
572 fn clone(&self) -> Self {
573 Self {
574 data: self.data.clone(),
575 timestamp: self.timestamp,
576 hit_count: AtomicUsize::new(self.hit_count.load(Ordering::Acquire)),
577 size: self.size,
578 }
579 }
580}
581
582#[derive(Debug)]
584pub struct GpuLoadBalancer {
585 pub device_utilization: HashMap<i32, f32>,
587 pub workload_distribution: HashMap<i32, usize>,
589 pub strategy: LoadBalancingStrategy,
591 pub performance_history: HashMap<i32, VecDeque<PerformanceSnapshot>>,
593}
594
595#[derive(Debug, Clone)]
597pub enum LoadBalancingStrategy {
598 RoundRobin,
600 LoadBased,
602 PerformanceBased,
604 MemoryAware,
606 Hybrid,
608}
609
610#[derive(Debug, Clone)]
612pub struct PerformanceSnapshot {
613 pub timestamp: Instant,
615 pub utilization: f32,
617 pub memory_usage: f32,
619 pub ops_per_second: f64,
621 pub avg_latency: Duration,
623}
624
625impl FaissGpuIndex {
626 pub async fn new(faiss_config: FaissConfig, gpu_config: FaissGpuConfig) -> Result<Self> {
628 let span = span!(Level::INFO, "faiss_gpu_index_new");
629 let _enter = span.enter();
630
631 let _base_gpu_config = crate::gpu::GpuConfig {
633 device_id: gpu_config.device_ids.first().copied().unwrap_or(0),
634 enable_mixed_precision: true,
635 enable_tensor_cores: true,
636 batch_size: 1024,
637 memory_pool_size: gpu_config.memory_per_device,
638 stream_count: gpu_config.stream_config.streams_per_device,
639 enable_peer_access: gpu_config.enable_multi_gpu,
640 enable_unified_memory: matches!(gpu_config.memory_strategy, GpuMemoryStrategy::Unified),
641 enable_async_execution: true,
642 enable_multi_gpu: gpu_config.enable_multi_gpu,
643 preferred_gpu_ids: gpu_config.device_ids.clone(),
644 dynamic_batch_sizing: true,
645 enable_memory_compression: false,
646 kernel_cache_size: 1024 * 1024,
647 optimization_level: crate::gpu::OptimizationLevel::Performance,
648 precision_mode: crate::gpu::PrecisionMode::Mixed,
649 };
650
651 let gpu_runtime = Arc::new(GpuExecutionConfig::default());
653
654 let mut memory_pools = HashMap::new();
656 for &device_id in &gpu_config.device_ids {
657 let pool = FaissGpuMemoryPool::new(device_id, gpu_config.memory_per_device)?;
658 memory_pools.insert(device_id, pool);
659 }
660
661 let mut compute_streams = HashMap::new();
663 for &device_id in &gpu_config.device_ids {
664 let streams = Self::create_compute_streams(device_id, &gpu_config.stream_config)?;
665 compute_streams.insert(device_id, streams);
666 }
667
668 let load_balancer =
670 GpuLoadBalancer::new(&gpu_config.device_ids, LoadBalancingStrategy::Hybrid);
671
672 let device_count = gpu_config.device_ids.len();
673 let index = Self {
674 faiss_config,
675 gpu_config,
676 gpu_runtime,
677 memory_pools: Arc::new(RwLock::new(memory_pools)),
678 compute_streams: Arc::new(RwLock::new(compute_streams)),
679 stats: Arc::new(RwLock::new(GpuPerformanceStats::default())),
680 work_queue: Arc::new(Mutex::new(VecDeque::new())),
681 results_cache: Arc::new(RwLock::new(HashMap::new())),
682 load_balancer: Arc::new(RwLock::new(load_balancer)),
683 };
684
685 index.start_background_workers().await?;
687
688 info!(
689 "Created GPU-accelerated FAISS index with {} devices",
690 device_count
691 );
692 Ok(index)
693 }
694
695 fn create_compute_streams(
697 device_id: i32,
698 stream_config: &GpuStreamConfig,
699 ) -> Result<Vec<GpuComputeStream>> {
700 let mut streams = Vec::new();
701
702 for i in 0..stream_config.streams_per_device {
703 let priority = stream_config
704 .priority_levels
705 .get(i % stream_config.priority_levels.len())
706 .copied()
707 .unwrap_or(0);
708
709 let stream = GpuComputeStream {
710 stream_id: i,
711 device_id,
712 stream_handle: device_id as usize * 1000 + i, priority,
714 current_operation: Arc::new(Mutex::new(None)),
715 operation_history: Arc::new(RwLock::new(VecDeque::new())),
716 utilization: Arc::new(RwLock::new(StreamUtilization::default())),
717 };
718
719 streams.push(stream);
720 }
721
722 Ok(streams)
723 }
724
725 async fn start_background_workers(&self) -> Result<()> {
727 let span = span!(Level::DEBUG, "start_background_workers");
728 let _enter = span.enter();
729
730 self.start_operation_processor().await?;
732
733 self.start_performance_monitor().await?;
735
736 self.start_memory_manager().await?;
738
739 if self.gpu_config.enable_multi_gpu {
741 self.start_load_balancer().await?;
742 }
743
744 debug!("Started background worker tasks");
745 Ok(())
746 }
747
748 async fn start_operation_processor(&self) -> Result<()> {
750 let work_queue = Arc::clone(&self.work_queue);
751 let compute_streams = Arc::clone(&self.compute_streams);
752 let stats = Arc::clone(&self.stats);
753 let gpu_config = self.gpu_config.clone();
754
755 tokio::spawn(async move {
756 loop {
757 if let Some(operation) = {
759 let mut queue = work_queue.lock().expect("lock poisoned");
760 queue.pop_front()
761 } {
762 if let Err(e) = Self::process_gpu_operation(
763 operation,
764 &compute_streams,
765 &stats,
766 &gpu_config,
767 )
768 .await
769 {
770 error!("Failed to process GPU operation: {}", e);
771 }
772 }
773
774 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
776 }
777 });
778
779 Ok(())
780 }
781
782 async fn process_gpu_operation(
784 mut operation: GpuOperation,
785 compute_streams: &Arc<RwLock<HashMap<i32, Vec<GpuComputeStream>>>>,
786 stats: &Arc<RwLock<GpuPerformanceStats>>,
787 gpu_config: &FaissGpuConfig,
788 ) -> Result<()> {
789 let start_time = Instant::now();
790
791 let (device_id, stream_id) =
793 Self::select_optimal_stream(compute_streams, &operation).await?;
794
795 let result_sender = operation.result_sender.take();
797
798 let result =
800 Self::execute_operation_on_device(operation, device_id, stream_id, gpu_config).await?;
801
802 if let Some(sender) = result_sender {
804 let _ = sender.send(result.clone());
805 }
806
807 Self::update_operation_stats(stats, &result, start_time.elapsed()).await?;
809
810 Ok(())
811 }
812
813 async fn select_optimal_stream(
815 compute_streams: &Arc<RwLock<HashMap<i32, Vec<GpuComputeStream>>>>,
816 _operation: &GpuOperation,
817 ) -> Result<(i32, usize)> {
818 let streams = compute_streams.read().expect("lock poisoned");
819
820 let mut best_device = 0;
822 let mut best_stream = 0;
823 let mut lowest_utilization = f32::MAX;
824
825 for (&device_id, device_streams) in streams.iter() {
826 for (stream_id, stream) in device_streams.iter().enumerate() {
827 let utilization = stream
828 .utilization
829 .read()
830 .expect("lock poisoned")
831 .utilization_percentage;
832 if utilization < lowest_utilization {
833 lowest_utilization = utilization;
834 best_device = device_id;
835 best_stream = stream_id;
836 }
837 }
838 }
839
840 Ok((best_device, best_stream))
841 }
842
843 async fn execute_operation_on_device(
845 operation: GpuOperation,
846 _device_id: i32,
847 _stream_id: usize,
848 _gpu_config: &FaissGpuConfig,
849 ) -> Result<GpuOperationResult> {
850 let start_time = Instant::now();
851
852 let result_data = match &operation.operation_type {
854 GpuOperationType::Search {
855 query_vectors, k, ..
856 } => {
857 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
859
860 let mut results = Vec::new();
861 for _query in query_vectors {
862 let mut query_results = Vec::new();
863 for i in 0..*k {
864 query_results.push((format!("gpu_result_{i}"), 0.95 - (i as f32 * 0.05)));
865 }
866 results.push(query_results);
867 }
868
869 GpuResultData::SearchResults(results)
870 }
871 GpuOperationType::Add { vectors: _, .. } => {
872 tokio::time::sleep(tokio::time::Duration::from_millis(5)).await;
874 GpuResultData::AdditionComplete
875 }
876 GpuOperationType::Train { .. } => {
877 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
879 GpuResultData::TrainingComplete
880 }
881 GpuOperationType::Optimize => {
882 tokio::time::sleep(tokio::time::Duration::from_millis(20)).await;
884 let mut metrics = HashMap::new();
885 metrics.insert("optimization_improvement".to_string(), 15.0);
886 metrics.insert("memory_efficiency".to_string(), 92.0);
887 GpuResultData::OptimizationMetrics(metrics)
888 }
889 GpuOperationType::MemoryTransfer { size, .. } => {
890 let transfer_time = *size as f64 / (10.0 * 1024.0 * 1024.0 * 1024.0); tokio::time::sleep(tokio::time::Duration::from_secs_f64(transfer_time)).await;
893 GpuResultData::TransferComplete
894 }
895 };
896
897 Ok(GpuOperationResult {
898 operation_id: operation.id,
899 success: true,
900 result_data,
901 execution_time: start_time.elapsed(),
902 memory_used: 1024 * 1024, error_message: None,
904 })
905 }
906
907 async fn update_operation_stats(
909 stats: &Arc<RwLock<GpuPerformanceStats>>,
910 result: &GpuOperationResult,
911 execution_time: Duration,
912 ) -> Result<()> {
913 let mut stats = stats.write().expect("lock poisoned");
914
915 stats.throughput.operations_per_second += 1.0 / execution_time.as_secs_f64();
917
918 if !result.success {
920 stats.error_stats.total_errors += 1;
921 }
922
923 Ok(())
924 }
925
926 async fn start_performance_monitor(&self) -> Result<()> {
928 let stats = Arc::clone(&self.stats);
929 let device_ids = self.gpu_config.device_ids.clone();
930
931 tokio::spawn(async move {
932 let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(1));
933
934 loop {
935 interval.tick().await;
936
937 if let Err(e) = Self::collect_performance_metrics(&stats, &device_ids).await {
939 warn!("Failed to collect performance metrics: {}", e);
940 }
941 }
942 });
943
944 Ok(())
945 }
946
947 async fn collect_performance_metrics(
949 stats: &Arc<RwLock<GpuPerformanceStats>>,
950 device_ids: &[i32],
951 ) -> Result<()> {
952 let mut stats = stats.write().expect("lock poisoned");
953
954 for &device_id in device_ids {
955 let device_stats = DeviceStats {
957 utilization: 75.0 + (device_id as f32 * 5.0) % 25.0, memory_usage: MemoryUsageStats {
959 total_memory: 8 * 1024 * 1024 * 1024, used_memory: 6 * 1024 * 1024 * 1024, free_memory: 2 * 1024 * 1024 * 1024, peak_usage: 7 * 1024 * 1024 * 1024, fragmentation: 5.0,
964 },
965 compute_performance: ComputePerformanceStats {
966 flops: 15.5e12, memory_bandwidth_utilization: 80.0,
968 kernel_efficiency: 85.0,
969 occupancy: 75.0,
970 },
971 power_consumption: 250.0, temperature: 70.0, };
974
975 stats.device_stats.insert(device_id, device_stats);
976 }
977
978 stats.overall_utilization = stats
980 .device_stats
981 .values()
982 .map(|s| s.utilization)
983 .sum::<f32>()
984 / stats.device_stats.len() as f32;
985
986 Ok(())
987 }
988
989 async fn start_memory_manager(&self) -> Result<()> {
991 let memory_pools = Arc::clone(&self.memory_pools);
992 let gpu_config = self.gpu_config.clone();
993
994 tokio::spawn(async move {
995 let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(5));
996
997 loop {
998 interval.tick().await;
999
1000 if let Err(e) = Self::manage_gpu_memory(&memory_pools, &gpu_config).await {
1002 warn!("Failed to manage GPU memory: {}", e);
1003 }
1004 }
1005 });
1006
1007 Ok(())
1008 }
1009
1010 async fn manage_gpu_memory(
1012 memory_pools: &Arc<RwLock<HashMap<i32, FaissGpuMemoryPool>>>,
1013 _gpu_config: &FaissGpuConfig,
1014 ) -> Result<()> {
1015 let pools = memory_pools.read().expect("lock poisoned");
1016
1017 for (device_id, pool) in pools.iter() {
1018 let fragmentation = pool.calculate_fragmentation();
1020 if fragmentation > 20.0 {
1021 debug!(
1022 "High fragmentation detected on device {}: {:.1}%",
1023 device_id, fragmentation
1024 );
1025 }
1027
1028 let allocated_blocks = pool.allocated_blocks.read().expect("lock poisoned");
1030 let now = Instant::now();
1031 for (_, block) in allocated_blocks.iter() {
1032 if now.duration_since(block.allocated_at) > Duration::from_secs(3600) {
1033 warn!(
1034 "Potential memory leak detected on device {}: block allocated {} ago",
1035 device_id,
1036 humantime::format_duration(now.duration_since(block.allocated_at))
1037 );
1038 }
1039 }
1040 }
1041
1042 Ok(())
1043 }
1044
1045 async fn start_load_balancer(&self) -> Result<()> {
1047 let load_balancer = Arc::clone(&self.load_balancer);
1048 let stats = Arc::clone(&self.stats);
1049
1050 tokio::spawn(async move {
1051 let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(2));
1052
1053 loop {
1054 interval.tick().await;
1055
1056 if let Err(e) = Self::update_load_balancing(&load_balancer, &stats).await {
1058 warn!("Failed to update load balancing: {}", e);
1059 }
1060 }
1061 });
1062
1063 Ok(())
1064 }
1065
1066 async fn update_load_balancing(
1068 load_balancer: &Arc<RwLock<GpuLoadBalancer>>,
1069 stats: &Arc<RwLock<GpuPerformanceStats>>,
1070 ) -> Result<()> {
1071 let stats = stats.read().expect("lock poisoned");
1072 let mut balancer = load_balancer.write().expect("lock poisoned");
1073
1074 for (&device_id, device_stats) in &stats.device_stats {
1076 balancer
1077 .device_utilization
1078 .insert(device_id, device_stats.utilization);
1079
1080 let snapshot = PerformanceSnapshot {
1082 timestamp: Instant::now(),
1083 utilization: device_stats.utilization,
1084 memory_usage: device_stats.memory_usage.used_memory as f32
1085 / device_stats.memory_usage.total_memory as f32
1086 * 100.0,
1087 ops_per_second: 1000.0, avg_latency: Duration::from_micros(250),
1089 };
1090
1091 balancer
1092 .performance_history
1093 .entry(device_id)
1094 .or_default()
1095 .push_back(snapshot);
1096
1097 if balancer.performance_history[&device_id].len() > 100 {
1099 balancer
1100 .performance_history
1101 .get_mut(&device_id)
1102 .expect("device_id should exist in performance_history")
1103 .pop_front();
1104 }
1105 }
1106
1107 Ok(())
1108 }
1109
1110 pub async fn search_gpu(
1112 &self,
1113 query_vectors: Vec<Vec<f32>>,
1114 k: usize,
1115 search_params: FaissSearchParams,
1116 ) -> Result<Vec<Vec<(String, f32)>>> {
1117 let span = span!(Level::DEBUG, "search_gpu");
1118 let _enter = span.enter();
1119
1120 let (result_sender, result_receiver) = oneshot::channel();
1122 let operation = GpuOperation {
1123 id: uuid::Uuid::new_v4().to_string(),
1124 operation_type: GpuOperationType::Search {
1125 query_vectors: query_vectors.clone(),
1126 k,
1127 search_params,
1128 },
1129 input_data: GpuOperationData::Vectors(query_vectors),
1130 output_size: k * std::mem::size_of::<(String, f32)>(),
1131 priority: 1,
1132 timeout: Some(Duration::from_secs(30)),
1133 result_sender: Some(result_sender),
1134 };
1135
1136 {
1138 let mut queue = self.work_queue.lock().expect("lock poisoned");
1139 queue.push_back(operation);
1140 }
1141
1142 let result = result_receiver
1144 .await
1145 .map_err(|_| AnyhowError::msg("GPU operation timeout"))?;
1146
1147 if !result.success {
1148 return Err(AnyhowError::msg(
1149 result
1150 .error_message
1151 .unwrap_or_else(|| "GPU operation failed".to_string()),
1152 ));
1153 }
1154
1155 match result.result_data {
1156 GpuResultData::SearchResults(results) => Ok(results),
1157 _ => Err(AnyhowError::msg("Unexpected result type")),
1158 }
1159 }
1160
1161 pub async fn add_vectors_gpu(&self, vectors: Vec<Vec<f32>>, ids: Vec<String>) -> Result<()> {
1163 let span = span!(Level::DEBUG, "add_vectors_gpu");
1164 let _enter = span.enter();
1165
1166 let (result_sender, result_receiver) = oneshot::channel();
1167 let operation = GpuOperation {
1168 id: uuid::Uuid::new_v4().to_string(),
1169 operation_type: GpuOperationType::Add {
1170 vectors: vectors.clone(),
1171 ids,
1172 },
1173 input_data: GpuOperationData::Vectors(vectors),
1174 output_size: 0,
1175 priority: 2,
1176 timeout: Some(Duration::from_secs(60)),
1177 result_sender: Some(result_sender),
1178 };
1179
1180 {
1181 let mut queue = self.work_queue.lock().expect("lock poisoned");
1182 queue.push_back(operation);
1183 }
1184
1185 let result = result_receiver
1186 .await
1187 .map_err(|_| AnyhowError::msg("GPU operation timeout"))?;
1188
1189 if !result.success {
1190 return Err(AnyhowError::msg(
1191 result
1192 .error_message
1193 .unwrap_or_else(|| "GPU operation failed".to_string()),
1194 ));
1195 }
1196
1197 Ok(())
1198 }
1199
1200 pub fn get_gpu_stats(&self) -> Result<GpuPerformanceStats> {
1202 let stats = self.stats.read().expect("lock poisoned");
1203 Ok(stats.clone())
1204 }
1205
1206 pub async fn optimize_gpu_performance(&self) -> Result<HashMap<String, f64>> {
1208 let span = span!(Level::INFO, "optimize_gpu_performance");
1209 let _enter = span.enter();
1210
1211 let (result_sender, result_receiver) = oneshot::channel();
1212 let operation = GpuOperation {
1213 id: uuid::Uuid::new_v4().to_string(),
1214 operation_type: GpuOperationType::Optimize,
1215 input_data: GpuOperationData::Empty,
1216 output_size: 0,
1217 priority: 0, timeout: Some(Duration::from_secs(120)),
1219 result_sender: Some(result_sender),
1220 };
1221
1222 {
1223 let mut queue = self.work_queue.lock().expect("lock poisoned");
1224 queue.push_back(operation);
1225 }
1226
1227 let result = result_receiver
1228 .await
1229 .map_err(|_| AnyhowError::msg("GPU optimization timeout"))?;
1230
1231 if !result.success {
1232 return Err(AnyhowError::msg("GPU optimization failed"));
1233 }
1234
1235 match result.result_data {
1236 GpuResultData::OptimizationMetrics(metrics) => Ok(metrics),
1237 _ => Err(AnyhowError::msg("Unexpected result type")),
1238 }
1239 }
1240}
1241
1242impl FaissGpuMemoryPool {
1243 pub fn new(device_id: i32, total_size: usize) -> Result<Self> {
1245 Ok(Self {
1246 device_id,
1247 total_size,
1248 allocated_size: AtomicUsize::new(0),
1249 free_blocks: Arc::new(Mutex::new(BTreeMap::new())),
1250 allocated_blocks: Arc::new(RwLock::new(HashMap::new())),
1251 allocation_stats: Arc::new(RwLock::new(AllocationStatistics::default())),
1252 })
1253 }
1254
1255 pub fn allocate(&self, size: usize, block_type: MemoryBlockType) -> Result<GpuMemoryBlock> {
1257 let aligned_size = (size + 255) & !255; if self.allocated_size.load(Ordering::Relaxed) + aligned_size > self.total_size {
1260 return Err(AnyhowError::msg("Out of GPU memory"));
1261 }
1262
1263 let block = GpuMemoryBlock {
1264 gpu_address: self.allocated_size.load(Ordering::Relaxed), size: aligned_size,
1266 allocated_at: Instant::now(),
1267 ref_count: AtomicUsize::new(1),
1268 block_type,
1269 };
1270
1271 self.allocated_size
1272 .fetch_add(aligned_size, Ordering::Relaxed);
1273
1274 {
1276 let mut stats = self.allocation_stats.write().expect("lock poisoned");
1277 stats.total_allocations += 1;
1278 let current_usage = self.allocated_size.load(Ordering::Relaxed);
1279 if current_usage > stats.peak_usage {
1280 stats.peak_usage = current_usage;
1281 }
1282 }
1283
1284 Ok(block)
1285 }
1286
1287 pub fn deallocate(&self, block: &GpuMemoryBlock) -> Result<()> {
1289 self.allocated_size.fetch_sub(block.size, Ordering::Relaxed);
1290
1291 {
1292 let mut stats = self.allocation_stats.write().expect("lock poisoned");
1293 stats.total_deallocations += 1;
1294 }
1295
1296 Ok(())
1297 }
1298
1299 pub fn calculate_fragmentation(&self) -> f32 {
1301 let allocated = self.allocated_size.load(Ordering::Relaxed);
1303 let free_blocks = self.free_blocks.lock().expect("lock poisoned");
1304 let num_free_blocks = free_blocks.len();
1305
1306 if allocated == 0 {
1307 return 0.0;
1308 }
1309
1310 (num_free_blocks as f32 / (allocated / 1024) as f32) * 100.0
1311 }
1312}
1313
1314impl GpuLoadBalancer {
1315 pub fn new(device_ids: &[i32], strategy: LoadBalancingStrategy) -> Self {
1317 let mut device_utilization = HashMap::new();
1318 let mut workload_distribution = HashMap::new();
1319 let mut performance_history = HashMap::new();
1320
1321 for &device_id in device_ids {
1322 device_utilization.insert(device_id, 0.0);
1323 workload_distribution.insert(device_id, 0);
1324 performance_history.insert(device_id, VecDeque::new());
1325 }
1326
1327 Self {
1328 device_utilization,
1329 workload_distribution,
1330 strategy,
1331 performance_history,
1332 }
1333 }
1334
1335 pub fn select_device(&self, operation: &GpuOperation) -> i32 {
1337 match self.strategy {
1338 LoadBalancingStrategy::RoundRobin => self.select_round_robin(),
1339 LoadBalancingStrategy::LoadBased => self.select_load_based(),
1340 LoadBalancingStrategy::PerformanceBased => self.select_performance_based(),
1341 LoadBalancingStrategy::MemoryAware => self.select_memory_aware(),
1342 LoadBalancingStrategy::Hybrid => self.select_hybrid(operation),
1343 }
1344 }
1345
1346 fn select_round_robin(&self) -> i32 {
1347 let total_workload: usize = self.workload_distribution.values().sum();
1349 let device_count = self.device_utilization.len();
1350 let target_device_index = total_workload % device_count;
1351
1352 *self
1353 .device_utilization
1354 .keys()
1355 .nth(target_device_index)
1356 .unwrap_or(&0)
1357 }
1358
1359 fn select_load_based(&self) -> i32 {
1360 self.device_utilization
1362 .iter()
1363 .min_by(|a, b| a.1.partial_cmp(b.1).unwrap_or(std::cmp::Ordering::Equal))
1364 .map(|(&device_id, _)| device_id)
1365 .unwrap_or(0)
1366 }
1367
1368 fn select_performance_based(&self) -> i32 {
1369 let mut best_device = 0;
1371 let mut best_score = f64::MIN;
1372
1373 for (&device_id, history) in &self.performance_history {
1374 if let Some(recent_snapshot) = history.back() {
1375 let score = recent_snapshot.ops_per_second
1376 / (recent_snapshot.avg_latency.as_secs_f64() + 1e-6);
1377 if score > best_score {
1378 best_score = score;
1379 best_device = device_id;
1380 }
1381 }
1382 }
1383
1384 best_device
1385 }
1386
1387 fn select_memory_aware(&self) -> i32 {
1388 self.device_utilization
1390 .iter()
1391 .min_by(|a, b| a.1.partial_cmp(b.1).unwrap_or(std::cmp::Ordering::Equal))
1392 .map(|(&device_id, _)| device_id)
1393 .unwrap_or(0)
1394 }
1395
1396 fn select_hybrid(&self, operation: &GpuOperation) -> i32 {
1397 match &operation.operation_type {
1399 GpuOperationType::Search { .. } => self.select_performance_based(),
1400 GpuOperationType::Add { .. } => self.select_memory_aware(),
1401 GpuOperationType::Train { .. } => self.select_load_based(),
1402 _ => self.select_round_robin(),
1403 }
1404 }
1405}
1406
1407#[cfg(test)]
1408mod tests {
1409 use super::*;
1410
1411 #[tokio::test]
1412 async fn test_faiss_gpu_index_creation() {
1413 let faiss_config = FaissConfig::default();
1414 let gpu_config = FaissGpuConfig::default();
1415
1416 let result = FaissGpuIndex::new(faiss_config, gpu_config).await;
1417 assert!(result.is_ok());
1418 }
1419
1420 #[test]
1421 fn test_gpu_memory_pool() {
1422 let pool = FaissGpuMemoryPool::new(0, 1024 * 1024).unwrap(); let block = pool.allocate(1024, MemoryBlockType::Vectors).unwrap();
1425 assert_eq!(block.size, 1024);
1426
1427 pool.deallocate(&block).unwrap();
1428 assert_eq!(pool.allocated_size.load(Ordering::Relaxed), 0);
1429 }
1430
1431 #[test]
1432 fn test_gpu_load_balancer() {
1433 let device_ids = vec![0, 1, 2];
1434 let balancer = GpuLoadBalancer::new(&device_ids, LoadBalancingStrategy::RoundRobin);
1435
1436 assert_eq!(balancer.device_utilization.len(), 3);
1437
1438 let operation = GpuOperation {
1439 id: "test".to_string(),
1440 operation_type: GpuOperationType::Optimize,
1441 input_data: GpuOperationData::Empty,
1442 output_size: 0,
1443 priority: 0,
1444 timeout: None,
1445 result_sender: None,
1446 };
1447
1448 let selected_device = balancer.select_device(&operation);
1449 assert!(device_ids.contains(&selected_device));
1450 }
1451}