1use crate::models::common::*;
7use anyhow::Result;
8use scirs2_core::ndarray_ext::{Array1, Array2};
9use scirs2_core::random::Random;
10
11#[cfg(feature = "gpu")]
12use scirs2_linalg::gpu::{GpuArray, GpuContext, GpuError};
13
14#[cfg(feature = "gpu")]
16#[derive(Debug)]
17pub struct GpuMemoryPool {
18 available_buffers: VecDeque<GpuArray<f32>>,
19 buffer_size: usize,
20 total_allocated: AtomicU64,
21 peak_usage: AtomicU64,
22}
23
24#[cfg(feature = "gpu")]
26#[derive(Debug, Clone)]
27pub struct AdaptiveBatchConfig {
28 pub min_batch_size: usize,
29 pub max_batch_size: usize,
30 pub target_gpu_utilization: f32,
31 pub memory_usage_threshold: f32,
32}
33
34#[cfg(feature = "gpu")]
36pub struct GpuEmbeddingAccelerator {
37 context: GpuContext,
38 device_id: u32,
39 memory_pool: Arc<Mutex<GpuMemoryPool>>,
40 batch_config: AdaptiveBatchConfig,
41 performance_stats: Arc<RwLock<GpuPerformanceStats>>,
42 optimal_batch_size: Arc<AtomicU64>,
43}
44
45#[cfg(feature = "gpu")]
47#[derive(Debug, Default)]
48pub struct GpuPerformanceStats {
49 pub total_operations: u64,
50 pub total_compute_time: Duration,
51 pub memory_transfers: u64,
52 pub cache_hits: u64,
53 pub cache_misses: u64,
54 pub average_batch_size: f32,
55 pub gpu_utilization_percentage: f32,
56}
57
58#[cfg(feature = "gpu")]
60#[derive(Debug)]
61pub struct GpuPerformanceReport {
62 pub device_id: u32,
63 pub total_operations: u64,
64 pub average_compute_time: Duration,
65 pub gpu_utilization: f32,
66 pub memory_allocated_mb: f64,
67 pub memory_peak_mb: f64,
68 pub cache_hit_rate: f32,
69 pub optimal_batch_size: usize,
70}
71
72#[cfg(feature = "gpu")]
73impl GpuMemoryPool {
74 pub fn new(buffer_size: usize, initial_pool_size: usize) -> Self {
75 Self {
76 available_buffers: VecDeque::with_capacity(initial_pool_size),
77 buffer_size,
78 total_allocated: AtomicU64::new(0),
79 peak_usage: AtomicU64::new(0),
80 }
81 }
82
83 pub fn get_buffer(&mut self) -> Option<GpuArray<f32>> {
84 self.available_buffers.pop_front()
85 }
86
87 pub fn return_buffer(&mut self, buffer: GpuArray<f32>) {
88 if buffer.len() == self.buffer_size {
89 self.available_buffers.push_back(buffer);
90 }
91 }
93
94 pub fn get_memory_stats(&self) -> (u64, u64) {
95 (
96 self.total_allocated.load(Ordering::Relaxed),
97 self.peak_usage.load(Ordering::Relaxed),
98 )
99 }
100}
101
102#[cfg(feature = "gpu")]
103impl GpuEmbeddingAccelerator {
104 pub fn new(device_id: u32) -> Result<Self, GpuError> {
106 let context = GpuContext::new(device_id)?;
107
108 let memory_pool = Arc::new(Mutex::new(GpuMemoryPool::new(1024 * 1024, 10))); let batch_config = AdaptiveBatchConfig {
111 min_batch_size: 32,
112 max_batch_size: 8192,
113 target_gpu_utilization: 0.85,
114 memory_usage_threshold: 0.8,
115 };
116
117 Ok(Self {
118 context,
119 device_id,
120 memory_pool,
121 batch_config,
122 performance_stats: Arc::new(RwLock::new(GpuPerformanceStats::default())),
123 optimal_batch_size: Arc::new(AtomicU64::new(512)), })
125 }
126
127 pub async fn get_optimal_batch_size(&self, data_size: usize) -> usize {
129 let optimal = self.optimal_batch_size.load(Ordering::Relaxed) as usize;
130 let config_min = self.batch_config.min_batch_size;
131 let config_max = self.batch_config.max_batch_size;
132
133 optimal.clamp(config_min, config_max.min(data_size))
135 }
136
137 pub async fn update_batch_size_feedback(&self, batch_size: usize, performance_score: f32) {
139 let current_optimal = self.optimal_batch_size.load(Ordering::Relaxed) as usize;
140
141 let new_optimal = if performance_score > 0.8 {
143 (current_optimal as f32 * 1.1).round() as usize
145 } else if performance_score < 0.5 {
146 (current_optimal as f32 * 0.9).round() as usize
148 } else {
149 current_optimal
150 };
151
152 let clamped_optimal = new_optimal.clamp(
153 self.batch_config.min_batch_size,
154 self.batch_config.max_batch_size,
155 );
156
157 self.optimal_batch_size
158 .store(clamped_optimal as u64, Ordering::Relaxed);
159 }
160
161 pub fn batch_l2_distances_gpu(
163 &self,
164 vectors_a: &[Array1<f64>],
165 vectors_b: &[Array1<f64>],
166 ) -> Result<Vec<f64>, GpuError> {
167 let gpu_a = self.upload_vectors_to_gpu(vectors_a)?;
169 let gpu_b = self.upload_vectors_to_gpu(vectors_b)?;
170
171 let gpu_distances = gpu_a.batch_l2_distance(&gpu_b)?;
173
174 let distances = gpu_distances.download_to_host()?;
176 Ok(distances)
177 }
178
179 pub fn cosine_similarity_matrix_gpu(
181 &self,
182 vectors: &[Array1<f64>],
183 ) -> Result<Array2<f64>, GpuError> {
184 let gpu_vectors = self.upload_vectors_to_gpu(vectors)?;
185 let gpu_similarity_matrix = gpu_vectors.cosine_similarity_matrix()?;
186 let similarity_matrix = gpu_similarity_matrix.download_to_host_array2()?;
187 Ok(similarity_matrix)
188 }
189
190 pub fn batch_gradient_update_gpu(
192 &self,
193 embeddings: &mut [Array2<f64>],
194 gradients: &[Array2<f64>],
195 learning_rate: f64,
196 l2_reg: f64,
197 ) -> Result<(), GpuError> {
198 for (embedding, gradient) in embeddings.iter_mut().zip(gradients.iter()) {
199 let mut gpu_embedding = self.context.upload_array2(embedding)?;
201 let gpu_gradient = self.context.upload_array2(gradient)?;
202
203 gpu_embedding.gradient_update(&gpu_gradient, learning_rate, l2_reg)?;
205
206 *embedding = gpu_embedding.download_to_host_array2()?;
208 }
209 Ok(())
210 }
211
212 pub async fn adaptive_batch_processing<T, R>(
214 &self,
215 data: &[T],
216 mut process_fn: impl FnMut(&[T]) -> Result<Vec<R>, GpuError>,
217 ) -> Result<Vec<R>, GpuError> {
218 let start_time = Instant::now();
219 let batch_size = self.get_optimal_batch_size(data.len()).await;
220
221 let mut results = Vec::with_capacity(data.len());
222 let mut total_processing_time = Duration::ZERO;
223
224 for chunk in data.chunks(batch_size) {
225 let chunk_start = Instant::now();
226 let chunk_results = process_fn(chunk)?;
227 let chunk_time = chunk_start.elapsed();
228
229 results.extend(chunk_results);
230 total_processing_time += chunk_time;
231 }
232
233 let total_time = start_time.elapsed();
235 let gpu_utilization = total_processing_time.as_secs_f32() / total_time.as_secs_f32();
236 let performance_score = gpu_utilization.min(1.0);
237
238 self.update_batch_size_feedback(batch_size, performance_score)
239 .await;
240
241 let mut stats = self.performance_stats.write().await;
243 stats.total_operations += 1;
244 stats.total_compute_time += total_time;
245 stats.gpu_utilization_percentage = gpu_utilization * 100.0;
246 stats.average_batch_size = (stats.average_batch_size + batch_size as f32) / 2.0;
247
248 Ok(results)
249 }
250
251 pub async fn optimized_matrix_multiply(
253 &self,
254 a: &Array2<f32>,
255 b: &Array2<f32>,
256 ) -> Result<Array2<f32>, GpuError> {
257 let mut pool = self.memory_pool.lock().await;
258
259 let gpu_a = match pool.get_buffer() {
261 Some(mut buffer) => {
262 buffer.copy_from_host(a.as_slice().unwrap())?;
263 buffer
264 }
265 None => self.context.upload_array2_f32(a)?,
266 };
267
268 let gpu_b = match pool.get_buffer() {
269 Some(mut buffer) => {
270 buffer.copy_from_host(b.as_slice().unwrap())?;
271 buffer
272 }
273 None => self.context.upload_array2_f32(b)?,
274 };
275
276 let gpu_result = gpu_a.matrix_multiply(&gpu_b)?;
278 let result = gpu_result.download_to_host_array2()?;
279
280 pool.return_buffer(gpu_a);
282 pool.return_buffer(gpu_b);
283
284 Ok(result)
285 }
286
287 pub async fn gpu_embedding_search(
289 &self,
290 query_embedding: &Array1<f32>,
291 database_embeddings: &[Array1<f32>],
292 top_k: usize,
293 ) -> Result<Vec<(usize, f32)>, GpuError> {
294 let batch_size = self.get_optimal_batch_size(database_embeddings.len()).await;
296 let mut all_similarities = Vec::with_capacity(database_embeddings.len());
297
298 for (batch_idx, batch) in database_embeddings.chunks(batch_size).enumerate() {
300 let similarities = self
301 .compute_batch_similarities(query_embedding, batch)
302 .await?;
303
304 for (local_idx, similarity) in similarities.iter().enumerate() {
305 let global_idx = batch_idx * batch_size + local_idx;
306 all_similarities.push((global_idx, *similarity));
307 }
308 }
309
310 all_similarities.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap());
312 all_similarities.truncate(top_k);
313
314 Ok(all_similarities)
315 }
316
317 async fn compute_batch_similarities(
319 &self,
320 query: &Array1<f32>,
321 batch: &[Array1<f32>],
322 ) -> Result<Vec<f32>, GpuError> {
323 let gpu_query = self.context.upload_array1_f32(query)?;
325 let gpu_batch = self.upload_batch_to_gpu(batch)?;
326
327 let gpu_similarities = gpu_query.batch_cosine_similarity(&gpu_batch)?;
329 let similarities = gpu_similarities.download_to_host()?;
330
331 Ok(similarities)
332 }
333
334 pub fn xavier_init_gpu(
336 &self,
337 shapes: &[(usize, usize)],
338 fan_in: usize,
339 fan_out: usize,
340 seed: u64,
341 ) -> Result<Vec<Array2<f64>>, GpuError> {
342 let limit = (6.0 / (fan_in + fan_out) as f64).sqrt();
343
344 let mut results = Vec::with_capacity(shapes.len());
345 for &shape in shapes {
346 let gpu_array = self
347 .context
348 .random_uniform_array2(shape, -limit, limit, seed)?;
349 let host_array = gpu_array.download_to_host_array2()?;
350 results.push(host_array);
351 }
352 Ok(results)
353 }
354
355 pub fn contrastive_learning_gpu(
357 &self,
358 entity_embeddings: &mut [Array1<f32>],
359 similarity_pairs: &[(usize, usize)],
360 negative_samples: &[(usize, usize)],
361 temperature: f32,
362 learning_rate: f32,
363 ) -> Result<f32, GpuError> {
364 let mut gpu_embeddings = self.upload_f32_vectors_to_gpu(entity_embeddings)?;
366
367 let loss = gpu_embeddings.contrastive_learning_update(
369 similarity_pairs,
370 negative_samples,
371 temperature,
372 learning_rate,
373 )?;
374
375 for (i, embedding) in entity_embeddings.iter_mut().enumerate() {
377 *embedding = gpu_embeddings.get_vector(i)?.download_to_host_array1()?;
378 }
379
380 Ok(loss)
381 }
382
383 fn upload_vectors_to_gpu(&self, vectors: &[Array1<f64>]) -> Result<GpuArray<f64>, GpuError> {
385 self.context.upload_vector_batch(vectors)
386 }
387
388 fn upload_f32_vectors_to_gpu(
390 &self,
391 vectors: &[Array1<f32>],
392 ) -> Result<GpuArray<f32>, GpuError> {
393 self.context.upload_f32_vector_batch(vectors)
394 }
395
396 pub fn device_info(&self) -> String {
398 format!(
399 "GPU Device {}: {}",
400 self.device_id,
401 self.context.device_name()
402 )
403 }
404
405 pub fn available_memory(&self) -> Result<u64, GpuError> {
407 self.context.available_memory()
408 }
409
410 pub async fn get_performance_report(&self) -> GpuPerformanceReport {
412 let stats = self.performance_stats.read().await;
413 let (allocated, peak) = {
414 let pool = self.memory_pool.lock().await;
415 pool.get_memory_stats()
416 };
417
418 GpuPerformanceReport {
419 device_id: self.device_id,
420 total_operations: stats.total_operations,
421 average_compute_time: if stats.total_operations > 0 {
422 stats.total_compute_time / stats.total_operations as u32
423 } else {
424 Duration::ZERO
425 },
426 gpu_utilization: stats.gpu_utilization_percentage,
427 memory_allocated_mb: allocated as f64 / (1024.0 * 1024.0),
428 memory_peak_mb: peak as f64 / (1024.0 * 1024.0),
429 cache_hit_rate: if stats.cache_hits + stats.cache_misses > 0 {
430 stats.cache_hits as f32 / (stats.cache_hits + stats.cache_misses) as f32
431 } else {
432 0.0
433 },
434 optimal_batch_size: self.optimal_batch_size.load(Ordering::Relaxed) as usize,
435 }
436 }
437
438 pub async fn reset_performance_stats(&self) {
440 let mut stats = self.performance_stats.write().await;
441 *stats = GpuPerformanceStats::default();
442 self.optimal_batch_size.store(512, Ordering::Relaxed);
443 }
444
445 pub async fn get_memory_pool_status(&self) -> (usize, u64, u64) {
447 let pool = self.memory_pool.lock().await;
448 let (allocated, peak) = pool.get_memory_stats();
449 (pool.available_buffers.len(), allocated, peak)
450 }
451}
452
453#[cfg(not(feature = "gpu"))]
455pub struct GpuEmbeddingAccelerator;
456
457#[cfg(not(feature = "gpu"))]
458impl GpuEmbeddingAccelerator {
459 pub fn new(_device_id: u32) -> Result<Self> {
460 Ok(Self)
461 }
462
463 pub fn batch_l2_distances_gpu(
465 &self,
466 vectors_a: &[Array1<f64>],
467 vectors_b: &[Array1<f64>],
468 ) -> Result<Vec<f64>> {
469 Ok(batch_l2_distances(vectors_a, vectors_b))
470 }
471
472 pub fn cosine_similarity_matrix_gpu(&self, vectors: &[Array1<f64>]) -> Result<Array2<f64>> {
474 Ok(pairwise_distances(vectors))
475 }
476
477 pub fn batch_gradient_update_gpu(
479 &self,
480 embeddings: &mut [Array2<f64>],
481 gradients: &[Array2<f64>],
482 learning_rate: f64,
483 l2_reg: f64,
484 ) -> Result<()> {
485 batch_gradient_update(embeddings, gradients, learning_rate, l2_reg);
486 Ok(())
487 }
488
489 pub fn xavier_init_gpu(
491 &self,
492 shapes: &[(usize, usize)],
493 fan_in: usize,
494 fan_out: usize,
495 _seed: u64,
496 ) -> Result<Vec<Array2<f64>>> {
497 let mut rng = Random::default();
498 Ok(batch_xavier_init(shapes, fan_in, fan_out, &mut rng))
499 }
500
501 pub fn device_info(&self) -> String {
502 "CPU (GPU acceleration not available)".to_string()
503 }
504
505 pub fn available_memory(&self) -> Result<u64> {
506 Ok(8 * 1024 * 1024 * 1024) }
509}
510
511pub struct AdaptiveEmbeddingAccelerator {
513 gpu_accelerator: Option<GpuEmbeddingAccelerator>,
514 gpu_threshold: usize,
515}
516
517impl AdaptiveEmbeddingAccelerator {
518 pub fn new(device_id: Option<u32>, gpu_threshold: usize) -> Result<Self> {
520 #[allow(unused_variables)]
521 let gpu_accelerator = if let Some(id) = device_id {
522 #[cfg(feature = "gpu")]
523 {
524 GpuEmbeddingAccelerator::new(id).ok()
525 }
526 #[cfg(not(feature = "gpu"))]
527 {
528 None
529 }
530 } else {
531 None
532 };
533
534 Ok(Self {
535 gpu_accelerator,
536 gpu_threshold,
537 })
538 }
539
540 pub fn adaptive_batch_distances(
542 &self,
543 vectors_a: &[Array1<f64>],
544 vectors_b: &[Array1<f64>],
545 ) -> Result<Vec<f64>> {
546 if self.should_use_gpu(vectors_a.len() * vectors_b.len()) {
547 if let Some(ref gpu) = self.gpu_accelerator {
548 return gpu
549 .batch_l2_distances_gpu(vectors_a, vectors_b)
550 .map_err(|e| anyhow::anyhow!("GPU error: {:?}", e));
551 }
552 }
553
554 Ok(batch_l2_distances(vectors_a, vectors_b))
556 }
557
558 pub fn adaptive_gradient_update(
560 &self,
561 embeddings: &mut [Array2<f64>],
562 gradients: &[Array2<f64>],
563 learning_rate: f64,
564 l2_reg: f64,
565 ) -> Result<()> {
566 let total_elements: usize = embeddings.iter().map(|e| e.len()).sum();
567
568 if self.should_use_gpu(total_elements) {
569 if let Some(ref gpu) = self.gpu_accelerator {
570 return gpu
571 .batch_gradient_update_gpu(embeddings, gradients, learning_rate, l2_reg)
572 .map_err(|e| anyhow::anyhow!("GPU error: {:?}", e));
573 }
574 }
575
576 batch_gradient_update(embeddings, gradients, learning_rate, l2_reg);
578 Ok(())
579 }
580
581 fn should_use_gpu(&self, problem_size: usize) -> bool {
583 self.gpu_accelerator.is_some() && problem_size >= self.gpu_threshold
584 }
585
586 pub fn info(&self) -> String {
588 match &self.gpu_accelerator {
589 Some(gpu) => format!(
590 "Adaptive: {} (threshold: {})",
591 gpu.device_info(),
592 self.gpu_threshold
593 ),
594 None => format!("Adaptive: CPU only (threshold: {})", self.gpu_threshold),
595 }
596 }
597}
598
599#[cfg(test)]
600mod tests {
601 use super::*;
602
603 #[test]
604 fn test_adaptive_accelerator_creation() {
605 let accelerator = AdaptiveEmbeddingAccelerator::new(None, 1000).unwrap();
606 assert!(accelerator.info().contains("CPU only"));
607 }
608
609 #[test]
610 fn test_fallback_distance_computation() {
611 let accelerator = AdaptiveEmbeddingAccelerator::new(None, 1000).unwrap();
612
613 let vectors_a = vec![
614 Array1::from_vec(vec![1.0, 2.0, 3.0]),
615 Array1::from_vec(vec![4.0, 5.0, 6.0]),
616 ];
617 let vectors_b = vec![
618 Array1::from_vec(vec![7.0, 8.0, 9.0]),
619 Array1::from_vec(vec![10.0, 11.0, 12.0]),
620 ];
621
622 let distances = accelerator
623 .adaptive_batch_distances(&vectors_a, &vectors_b)
624 .unwrap();
625 assert_eq!(distances.len(), 4); }
627
628 #[test]
629 fn test_fallback_gradient_update() {
630 let accelerator = AdaptiveEmbeddingAccelerator::new(None, 1000).unwrap();
631
632 let mut embeddings = vec![Array2::zeros((2, 3))];
633 let gradients = vec![Array2::ones((2, 3))];
634
635 accelerator
636 .adaptive_gradient_update(&mut embeddings, &gradients, 0.01, 0.001)
637 .unwrap();
638
639 assert!(embeddings[0][[0, 0]] != 0.0);
641 }
642
643 #[cfg(feature = "gpu")]
644 #[test]
645 fn test_gpu_accelerator_creation() {
646 match GpuEmbeddingAccelerator::new(0) {
648 Ok(gpu) => {
649 println!("GPU Accelerator: {}", gpu.device_info());
650 let memory = gpu.available_memory().unwrap_or(0);
651 println!("Available GPU Memory: {} MB", memory / (1024 * 1024));
652 }
653 Err(_) => {
654 println!("GPU not available for testing");
655 }
656 }
657 }
658}