oxirs_embed/acceleration/
gpu.rs

1//! GPU Acceleration for Embedding Computations
2//!
3//! This module provides GPU-accelerated implementations of embedding operations
4//! using scirs2-linalg GPU features for CUDA, OpenCL, ROCm, and Metal backends.
5
6use crate::models::common::*;
7use anyhow::Result;
8use scirs2_core::ndarray_ext::{Array1, Array2};
9use scirs2_core::random::Random;
10
11#[cfg(feature = "gpu")]
12use scirs2_linalg::gpu::{GpuArray, GpuContext, GpuError};
13
14/// Memory pool for GPU buffers
15#[cfg(feature = "gpu")]
16#[derive(Debug)]
17pub struct GpuMemoryPool {
18    available_buffers: VecDeque<GpuArray<f32>>,
19    buffer_size: usize,
20    total_allocated: AtomicU64,
21    peak_usage: AtomicU64,
22}
23
24/// Adaptive batch sizing configuration
25#[cfg(feature = "gpu")]
26#[derive(Debug, Clone)]
27pub struct AdaptiveBatchConfig {
28    pub min_batch_size: usize,
29    pub max_batch_size: usize,
30    pub target_gpu_utilization: f32,
31    pub memory_usage_threshold: f32,
32}
33
34/// Enhanced GPU-accelerated embedding computations with memory pooling and adaptive batching
35#[cfg(feature = "gpu")]
36pub struct GpuEmbeddingAccelerator {
37    context: GpuContext,
38    device_id: u32,
39    memory_pool: Arc<Mutex<GpuMemoryPool>>,
40    batch_config: AdaptiveBatchConfig,
41    performance_stats: Arc<RwLock<GpuPerformanceStats>>,
42    optimal_batch_size: Arc<AtomicU64>,
43}
44
45/// GPU performance statistics
46#[cfg(feature = "gpu")]
47#[derive(Debug, Default)]
48pub struct GpuPerformanceStats {
49    pub total_operations: u64,
50    pub total_compute_time: Duration,
51    pub memory_transfers: u64,
52    pub cache_hits: u64,
53    pub cache_misses: u64,
54    pub average_batch_size: f32,
55    pub gpu_utilization_percentage: f32,
56}
57
58/// Comprehensive GPU performance report
59#[cfg(feature = "gpu")]
60#[derive(Debug)]
61pub struct GpuPerformanceReport {
62    pub device_id: u32,
63    pub total_operations: u64,
64    pub average_compute_time: Duration,
65    pub gpu_utilization: f32,
66    pub memory_allocated_mb: f64,
67    pub memory_peak_mb: f64,
68    pub cache_hit_rate: f32,
69    pub optimal_batch_size: usize,
70}
71
72#[cfg(feature = "gpu")]
73impl GpuMemoryPool {
74    pub fn new(buffer_size: usize, initial_pool_size: usize) -> Self {
75        Self {
76            available_buffers: VecDeque::with_capacity(initial_pool_size),
77            buffer_size,
78            total_allocated: AtomicU64::new(0),
79            peak_usage: AtomicU64::new(0),
80        }
81    }
82
83    pub fn get_buffer(&mut self) -> Option<GpuArray<f32>> {
84        self.available_buffers.pop_front()
85    }
86
87    pub fn return_buffer(&mut self, buffer: GpuArray<f32>) {
88        if buffer.len() == self.buffer_size {
89            self.available_buffers.push_back(buffer);
90        }
91        // If buffer size doesn't match, let it drop (auto-deallocate)
92    }
93
94    pub fn get_memory_stats(&self) -> (u64, u64) {
95        (
96            self.total_allocated.load(Ordering::Relaxed),
97            self.peak_usage.load(Ordering::Relaxed),
98        )
99    }
100}
101
102#[cfg(feature = "gpu")]
103impl GpuEmbeddingAccelerator {
104    /// Create a new enhanced GPU accelerator with memory pooling and adaptive batching
105    pub fn new(device_id: u32) -> Result<Self, GpuError> {
106        let context = GpuContext::new(device_id)?;
107
108        let memory_pool = Arc::new(Mutex::new(GpuMemoryPool::new(1024 * 1024, 10))); // 1MB buffers, 10 initial
109
110        let batch_config = AdaptiveBatchConfig {
111            min_batch_size: 32,
112            max_batch_size: 8192,
113            target_gpu_utilization: 0.85,
114            memory_usage_threshold: 0.8,
115        };
116
117        Ok(Self {
118            context,
119            device_id,
120            memory_pool,
121            batch_config,
122            performance_stats: Arc::new(RwLock::new(GpuPerformanceStats::default())),
123            optimal_batch_size: Arc::new(AtomicU64::new(512)), // Start with reasonable default
124        })
125    }
126
127    /// Get optimal batch size based on recent performance
128    pub async fn get_optimal_batch_size(&self, data_size: usize) -> usize {
129        let optimal = self.optimal_batch_size.load(Ordering::Relaxed) as usize;
130        let config_min = self.batch_config.min_batch_size;
131        let config_max = self.batch_config.max_batch_size;
132
133        // Clamp to configuration bounds and data size
134        optimal.clamp(config_min, config_max.min(data_size))
135    }
136
137    /// Update optimal batch size based on performance feedback
138    pub async fn update_batch_size_feedback(&self, batch_size: usize, performance_score: f32) {
139        let current_optimal = self.optimal_batch_size.load(Ordering::Relaxed) as usize;
140
141        // Simple adaptive algorithm: increase if performance is good, decrease if poor
142        let new_optimal = if performance_score > 0.8 {
143            // Good performance, try larger batches
144            (current_optimal as f32 * 1.1).round() as usize
145        } else if performance_score < 0.5 {
146            // Poor performance, try smaller batches
147            (current_optimal as f32 * 0.9).round() as usize
148        } else {
149            current_optimal
150        };
151
152        let clamped_optimal = new_optimal.clamp(
153            self.batch_config.min_batch_size,
154            self.batch_config.max_batch_size,
155        );
156
157        self.optimal_batch_size
158            .store(clamped_optimal as u64, Ordering::Relaxed);
159    }
160
161    /// GPU-accelerated batch distance computation
162    pub fn batch_l2_distances_gpu(
163        &self,
164        vectors_a: &[Array1<f64>],
165        vectors_b: &[Array1<f64>],
166    ) -> Result<Vec<f64>, GpuError> {
167        // Convert to GPU arrays
168        let gpu_a = self.upload_vectors_to_gpu(vectors_a)?;
169        let gpu_b = self.upload_vectors_to_gpu(vectors_b)?;
170
171        // Compute distances on GPU
172        let gpu_distances = gpu_a.batch_l2_distance(&gpu_b)?;
173
174        // Download results
175        let distances = gpu_distances.download_to_host()?;
176        Ok(distances)
177    }
178
179    /// GPU-accelerated cosine similarity matrix
180    pub fn cosine_similarity_matrix_gpu(
181        &self,
182        vectors: &[Array1<f64>],
183    ) -> Result<Array2<f64>, GpuError> {
184        let gpu_vectors = self.upload_vectors_to_gpu(vectors)?;
185        let gpu_similarity_matrix = gpu_vectors.cosine_similarity_matrix()?;
186        let similarity_matrix = gpu_similarity_matrix.download_to_host_array2()?;
187        Ok(similarity_matrix)
188    }
189
190    /// GPU-accelerated gradient updates for large embedding matrices
191    pub fn batch_gradient_update_gpu(
192        &self,
193        embeddings: &mut [Array2<f64>],
194        gradients: &[Array2<f64>],
195        learning_rate: f64,
196        l2_reg: f64,
197    ) -> Result<(), GpuError> {
198        for (embedding, gradient) in embeddings.iter_mut().zip(gradients.iter()) {
199            // Upload to GPU
200            let mut gpu_embedding = self.context.upload_array2(embedding)?;
201            let gpu_gradient = self.context.upload_array2(gradient)?;
202
203            // Perform gradient update on GPU
204            gpu_embedding.gradient_update(&gpu_gradient, learning_rate, l2_reg)?;
205
206            // Download updated embeddings
207            *embedding = gpu_embedding.download_to_host_array2()?;
208        }
209        Ok(())
210    }
211
212    /// Advanced GPU-accelerated adaptive batch processing with memory pooling
213    pub async fn adaptive_batch_processing<T, R>(
214        &self,
215        data: &[T],
216        mut process_fn: impl FnMut(&[T]) -> Result<Vec<R>, GpuError>,
217    ) -> Result<Vec<R>, GpuError> {
218        let start_time = Instant::now();
219        let batch_size = self.get_optimal_batch_size(data.len()).await;
220
221        let mut results = Vec::with_capacity(data.len());
222        let mut total_processing_time = Duration::ZERO;
223
224        for chunk in data.chunks(batch_size) {
225            let chunk_start = Instant::now();
226            let chunk_results = process_fn(chunk)?;
227            let chunk_time = chunk_start.elapsed();
228
229            results.extend(chunk_results);
230            total_processing_time += chunk_time;
231        }
232
233        // Calculate performance score and update batch size
234        let total_time = start_time.elapsed();
235        let gpu_utilization = total_processing_time.as_secs_f32() / total_time.as_secs_f32();
236        let performance_score = gpu_utilization.min(1.0);
237
238        self.update_batch_size_feedback(batch_size, performance_score)
239            .await;
240
241        // Update performance statistics
242        let mut stats = self.performance_stats.write().await;
243        stats.total_operations += 1;
244        stats.total_compute_time += total_time;
245        stats.gpu_utilization_percentage = gpu_utilization * 100.0;
246        stats.average_batch_size = (stats.average_batch_size + batch_size as f32) / 2.0;
247
248        Ok(results)
249    }
250
251    /// GPU-accelerated matrix multiplication with memory reuse
252    pub async fn optimized_matrix_multiply(
253        &self,
254        a: &Array2<f32>,
255        b: &Array2<f32>,
256    ) -> Result<Array2<f32>, GpuError> {
257        let mut pool = self.memory_pool.lock().await;
258
259        // Try to get buffers from pool
260        let gpu_a = match pool.get_buffer() {
261            Some(mut buffer) => {
262                buffer.copy_from_host(a.as_slice().unwrap())?;
263                buffer
264            }
265            None => self.context.upload_array2_f32(a)?,
266        };
267
268        let gpu_b = match pool.get_buffer() {
269            Some(mut buffer) => {
270                buffer.copy_from_host(b.as_slice().unwrap())?;
271                buffer
272            }
273            None => self.context.upload_array2_f32(b)?,
274        };
275
276        // Perform matrix multiplication
277        let gpu_result = gpu_a.matrix_multiply(&gpu_b)?;
278        let result = gpu_result.download_to_host_array2()?;
279
280        // Return buffers to pool
281        pool.return_buffer(gpu_a);
282        pool.return_buffer(gpu_b);
283
284        Ok(result)
285    }
286
287    /// High-performance embedding search with GPU acceleration
288    pub async fn gpu_embedding_search(
289        &self,
290        query_embedding: &Array1<f32>,
291        database_embeddings: &[Array1<f32>],
292        top_k: usize,
293    ) -> Result<Vec<(usize, f32)>, GpuError> {
294        // Use adaptive batching for large databases
295        let batch_size = self.get_optimal_batch_size(database_embeddings.len()).await;
296        let mut all_similarities = Vec::with_capacity(database_embeddings.len());
297
298        // Process in adaptive batches
299        for (batch_idx, batch) in database_embeddings.chunks(batch_size).enumerate() {
300            let similarities = self
301                .compute_batch_similarities(query_embedding, batch)
302                .await?;
303
304            for (local_idx, similarity) in similarities.iter().enumerate() {
305                let global_idx = batch_idx * batch_size + local_idx;
306                all_similarities.push((global_idx, *similarity));
307            }
308        }
309
310        // Sort and return top-k
311        all_similarities.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap());
312        all_similarities.truncate(top_k);
313
314        Ok(all_similarities)
315    }
316
317    /// Compute similarities for a batch with GPU acceleration
318    async fn compute_batch_similarities(
319        &self,
320        query: &Array1<f32>,
321        batch: &[Array1<f32>],
322    ) -> Result<Vec<f32>, GpuError> {
323        // Upload query and batch to GPU
324        let gpu_query = self.context.upload_array1_f32(query)?;
325        let gpu_batch = self.upload_batch_to_gpu(batch)?;
326
327        // Compute cosine similarities
328        let gpu_similarities = gpu_query.batch_cosine_similarity(&gpu_batch)?;
329        let similarities = gpu_similarities.download_to_host()?;
330
331        Ok(similarities)
332    }
333
334    /// GPU-accelerated Xavier initialization for large embedding matrices
335    pub fn xavier_init_gpu(
336        &self,
337        shapes: &[(usize, usize)],
338        fan_in: usize,
339        fan_out: usize,
340        seed: u64,
341    ) -> Result<Vec<Array2<f64>>, GpuError> {
342        let limit = (6.0 / (fan_in + fan_out) as f64).sqrt();
343
344        let mut results = Vec::with_capacity(shapes.len());
345        for &shape in shapes {
346            let gpu_array = self
347                .context
348                .random_uniform_array2(shape, -limit, limit, seed)?;
349            let host_array = gpu_array.download_to_host_array2()?;
350            results.push(host_array);
351        }
352        Ok(results)
353    }
354
355    /// GPU-accelerated contrastive learning updates
356    pub fn contrastive_learning_gpu(
357        &self,
358        entity_embeddings: &mut [Array1<f32>],
359        similarity_pairs: &[(usize, usize)],
360        negative_samples: &[(usize, usize)],
361        temperature: f32,
362        learning_rate: f32,
363    ) -> Result<f32, GpuError> {
364        // Upload embeddings to GPU
365        let mut gpu_embeddings = self.upload_f32_vectors_to_gpu(entity_embeddings)?;
366
367        // Compute contrastive loss and gradients on GPU
368        let loss = gpu_embeddings.contrastive_learning_update(
369            similarity_pairs,
370            negative_samples,
371            temperature,
372            learning_rate,
373        )?;
374
375        // Download updated embeddings
376        for (i, embedding) in entity_embeddings.iter_mut().enumerate() {
377            *embedding = gpu_embeddings.get_vector(i)?.download_to_host_array1()?;
378        }
379
380        Ok(loss)
381    }
382
383    /// Helper function to upload vectors to GPU
384    fn upload_vectors_to_gpu(&self, vectors: &[Array1<f64>]) -> Result<GpuArray<f64>, GpuError> {
385        self.context.upload_vector_batch(vectors)
386    }
387
388    /// Helper function to upload f32 vectors to GPU
389    fn upload_f32_vectors_to_gpu(
390        &self,
391        vectors: &[Array1<f32>],
392    ) -> Result<GpuArray<f32>, GpuError> {
393        self.context.upload_f32_vector_batch(vectors)
394    }
395
396    /// Get GPU device info
397    pub fn device_info(&self) -> String {
398        format!(
399            "GPU Device {}: {}",
400            self.device_id,
401            self.context.device_name()
402        )
403    }
404
405    /// Get available GPU memory
406    pub fn available_memory(&self) -> Result<u64, GpuError> {
407        self.context.available_memory()
408    }
409
410    /// GPU memory and performance monitoring
411    pub async fn get_performance_report(&self) -> GpuPerformanceReport {
412        let stats = self.performance_stats.read().await;
413        let (allocated, peak) = {
414            let pool = self.memory_pool.lock().await;
415            pool.get_memory_stats()
416        };
417
418        GpuPerformanceReport {
419            device_id: self.device_id,
420            total_operations: stats.total_operations,
421            average_compute_time: if stats.total_operations > 0 {
422                stats.total_compute_time / stats.total_operations as u32
423            } else {
424                Duration::ZERO
425            },
426            gpu_utilization: stats.gpu_utilization_percentage,
427            memory_allocated_mb: allocated as f64 / (1024.0 * 1024.0),
428            memory_peak_mb: peak as f64 / (1024.0 * 1024.0),
429            cache_hit_rate: if stats.cache_hits + stats.cache_misses > 0 {
430                stats.cache_hits as f32 / (stats.cache_hits + stats.cache_misses) as f32
431            } else {
432                0.0
433            },
434            optimal_batch_size: self.optimal_batch_size.load(Ordering::Relaxed) as usize,
435        }
436    }
437
438    /// Reset performance statistics
439    pub async fn reset_performance_stats(&self) {
440        let mut stats = self.performance_stats.write().await;
441        *stats = GpuPerformanceStats::default();
442        self.optimal_batch_size.store(512, Ordering::Relaxed);
443    }
444
445    /// Get current memory pool status
446    pub async fn get_memory_pool_status(&self) -> (usize, u64, u64) {
447        let pool = self.memory_pool.lock().await;
448        let (allocated, peak) = pool.get_memory_stats();
449        (pool.available_buffers.len(), allocated, peak)
450    }
451}
452
453/// CPU fallback implementations when GPU is not available
454#[cfg(not(feature = "gpu"))]
455pub struct GpuEmbeddingAccelerator;
456
457#[cfg(not(feature = "gpu"))]
458impl GpuEmbeddingAccelerator {
459    pub fn new(_device_id: u32) -> Result<Self> {
460        Ok(Self)
461    }
462
463    /// Fallback to CPU implementation
464    pub fn batch_l2_distances_gpu(
465        &self,
466        vectors_a: &[Array1<f64>],
467        vectors_b: &[Array1<f64>],
468    ) -> Result<Vec<f64>> {
469        Ok(batch_l2_distances(vectors_a, vectors_b))
470    }
471
472    /// Fallback to CPU implementation
473    pub fn cosine_similarity_matrix_gpu(&self, vectors: &[Array1<f64>]) -> Result<Array2<f64>> {
474        Ok(pairwise_distances(vectors))
475    }
476
477    /// Fallback to CPU implementation
478    pub fn batch_gradient_update_gpu(
479        &self,
480        embeddings: &mut [Array2<f64>],
481        gradients: &[Array2<f64>],
482        learning_rate: f64,
483        l2_reg: f64,
484    ) -> Result<()> {
485        batch_gradient_update(embeddings, gradients, learning_rate, l2_reg);
486        Ok(())
487    }
488
489    /// Fallback to CPU implementation
490    pub fn xavier_init_gpu(
491        &self,
492        shapes: &[(usize, usize)],
493        fan_in: usize,
494        fan_out: usize,
495        _seed: u64,
496    ) -> Result<Vec<Array2<f64>>> {
497        let mut rng = Random::default();
498        Ok(batch_xavier_init(shapes, fan_in, fan_out, &mut rng))
499    }
500
501    pub fn device_info(&self) -> String {
502        "CPU (GPU acceleration not available)".to_string()
503    }
504
505    pub fn available_memory(&self) -> Result<u64> {
506        // Return available system RAM as approximation
507        Ok(8 * 1024 * 1024 * 1024) // 8GB default
508    }
509}
510
511/// Adaptive acceleration that chooses between GPU and CPU based on problem size
512pub struct AdaptiveEmbeddingAccelerator {
513    gpu_accelerator: Option<GpuEmbeddingAccelerator>,
514    gpu_threshold: usize,
515}
516
517impl AdaptiveEmbeddingAccelerator {
518    /// Create adaptive accelerator with optional GPU support
519    pub fn new(device_id: Option<u32>, gpu_threshold: usize) -> Result<Self> {
520    #[allow(unused_variables)]
521        let gpu_accelerator = if let Some(id) = device_id {
522            #[cfg(feature = "gpu")]
523            {
524                GpuEmbeddingAccelerator::new(id).ok()
525            }
526            #[cfg(not(feature = "gpu"))]
527            {
528                None
529            }
530        } else {
531            None
532        };
533
534        Ok(Self {
535            gpu_accelerator,
536            gpu_threshold,
537        })
538    }
539
540    /// Intelligently choose between GPU and CPU for distance computation
541    pub fn adaptive_batch_distances(
542        &self,
543        vectors_a: &[Array1<f64>],
544        vectors_b: &[Array1<f64>],
545    ) -> Result<Vec<f64>> {
546        if self.should_use_gpu(vectors_a.len() * vectors_b.len()) {
547            if let Some(ref gpu) = self.gpu_accelerator {
548                return gpu
549                    .batch_l2_distances_gpu(vectors_a, vectors_b)
550                    .map_err(|e| anyhow::anyhow!("GPU error: {:?}", e));
551            }
552        }
553
554        // Fallback to optimized CPU implementation
555        Ok(batch_l2_distances(vectors_a, vectors_b))
556    }
557
558    /// Intelligently choose between GPU and CPU for gradient updates
559    pub fn adaptive_gradient_update(
560        &self,
561        embeddings: &mut [Array2<f64>],
562        gradients: &[Array2<f64>],
563        learning_rate: f64,
564        l2_reg: f64,
565    ) -> Result<()> {
566        let total_elements: usize = embeddings.iter().map(|e| e.len()).sum();
567
568        if self.should_use_gpu(total_elements) {
569            if let Some(ref gpu) = self.gpu_accelerator {
570                return gpu
571                    .batch_gradient_update_gpu(embeddings, gradients, learning_rate, l2_reg)
572                    .map_err(|e| anyhow::anyhow!("GPU error: {:?}", e));
573            }
574        }
575
576        // Fallback to optimized CPU implementation
577        batch_gradient_update(embeddings, gradients, learning_rate, l2_reg);
578        Ok(())
579    }
580
581    /// Check if GPU should be used based on problem size
582    fn should_use_gpu(&self, problem_size: usize) -> bool {
583        self.gpu_accelerator.is_some() && problem_size >= self.gpu_threshold
584    }
585
586    /// Get acceleration info
587    pub fn info(&self) -> String {
588        match &self.gpu_accelerator {
589            Some(gpu) => format!(
590                "Adaptive: {} (threshold: {})",
591                gpu.device_info(),
592                self.gpu_threshold
593            ),
594            None => format!("Adaptive: CPU only (threshold: {})", self.gpu_threshold),
595        }
596    }
597}
598
599#[cfg(test)]
600mod tests {
601    use super::*;
602
603    #[test]
604    fn test_adaptive_accelerator_creation() {
605        let accelerator = AdaptiveEmbeddingAccelerator::new(None, 1000).unwrap();
606        assert!(accelerator.info().contains("CPU only"));
607    }
608
609    #[test]
610    fn test_fallback_distance_computation() {
611        let accelerator = AdaptiveEmbeddingAccelerator::new(None, 1000).unwrap();
612
613        let vectors_a = vec![
614            Array1::from_vec(vec![1.0, 2.0, 3.0]),
615            Array1::from_vec(vec![4.0, 5.0, 6.0]),
616        ];
617        let vectors_b = vec![
618            Array1::from_vec(vec![7.0, 8.0, 9.0]),
619            Array1::from_vec(vec![10.0, 11.0, 12.0]),
620        ];
621
622        let distances = accelerator
623            .adaptive_batch_distances(&vectors_a, &vectors_b)
624            .unwrap();
625        assert_eq!(distances.len(), 4); // 2x2 combinations
626    }
627
628    #[test]
629    fn test_fallback_gradient_update() {
630        let accelerator = AdaptiveEmbeddingAccelerator::new(None, 1000).unwrap();
631
632        let mut embeddings = vec![Array2::zeros((2, 3))];
633        let gradients = vec![Array2::ones((2, 3))];
634
635        accelerator
636            .adaptive_gradient_update(&mut embeddings, &gradients, 0.01, 0.001)
637            .unwrap();
638
639        // Check that gradients were applied
640        assert!(embeddings[0][[0, 0]] != 0.0);
641    }
642
643    #[cfg(feature = "gpu")]
644    #[test]
645    fn test_gpu_accelerator_creation() {
646        // This test will only run when GPU features are enabled
647        match GpuEmbeddingAccelerator::new(0) {
648            Ok(gpu) => {
649                println!("GPU Accelerator: {}", gpu.device_info());
650                let memory = gpu.available_memory().unwrap_or(0);
651                println!("Available GPU Memory: {} MB", memory / (1024 * 1024));
652            }
653            Err(_) => {
654                println!("GPU not available for testing");
655            }
656        }
657    }
658}