use anyhow::{anyhow, Result};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use tokio::sync::RwLock;
use tracing::{debug, warn};
use scirs2_core::gpu::{GpuBackend as ScirsGpuBackend, GpuContext as ScirsGpuContext};
pub struct GpuContext {
backend: GpuBackend,
config: GpuConfig,
stats: Arc<RwLock<GpuStats>>,
#[allow(dead_code)]
scirs_context: Option<ScirsGpuContext>,
}
impl GpuContext {
pub fn new(backend: GpuBackend) -> Result<Self> {
let config = GpuConfig::default();
let scirs_context = match backend {
GpuBackend::Cuda => {
debug!("Initializing CUDA backend");
ScirsGpuContext::new(ScirsGpuBackend::Cuda).ok()
}
GpuBackend::Metal => {
debug!("Initializing Metal backend");
ScirsGpuContext::new(ScirsGpuBackend::Metal).ok()
}
GpuBackend::Cpu => {
debug!("Using CPU fallback");
None
}
GpuBackend::Auto => {
ScirsGpuContext::new(ScirsGpuBackend::Cuda)
.or_else(|_| ScirsGpuContext::new(ScirsGpuBackend::Metal))
.ok()
}
};
Ok(Self {
backend,
config,
stats: Arc::new(RwLock::new(GpuStats::default())),
scirs_context,
})
}
pub fn is_available(&self) -> bool {
self.scirs_context.is_some()
}
pub fn backend(&self) -> GpuBackend {
self.backend
}
pub async fn batch_process(&self, data: &[f32]) -> Result<Vec<f32>> {
let mut stats = self.stats.write().await;
stats.batches_processed += 1;
if let Some(_ctx) = &self.scirs_context {
debug!("Processing batch on GPU: {} elements", data.len());
stats.gpu_operations += 1;
Ok(data.to_vec())
} else {
warn!("GPU not available, falling back to CPU");
stats.cpu_fallbacks += 1;
Ok(data.to_vec())
}
}
pub async fn matrix_multiply(
&self,
a: &[f32],
b: &[f32],
m: usize,
n: usize,
k: usize,
) -> Result<Vec<f32>> {
let mut stats = self.stats.write().await;
stats.matrix_operations += 1;
if let Some(_ctx) = &self.scirs_context {
debug!("GPU matrix multiply: {}x{} * {}x{}", m, n, n, k);
let mut result = vec![0.0f32; m * k];
for i in 0..m {
for j in 0..k {
for l in 0..n {
result[i * k + j] += a[i * n + l] * b[l * k + j];
}
}
}
Ok(result)
} else {
let mut result = vec![0.0f32; m * k];
for i in 0..m {
for j in 0..k {
for l in 0..n {
result[i * k + j] += a[i * n + l] * b[l * k + j];
}
}
}
Ok(result)
}
}
pub async fn vector_add(&self, a: &[f32], b: &[f32]) -> Result<Vec<f32>> {
if a.len() != b.len() {
return Err(anyhow!("Vector lengths must match"));
}
let mut stats = self.stats.write().await;
stats.vector_operations += 1;
if self.is_available() {
Ok(a.iter().zip(b.iter()).map(|(x, y)| x + y).collect())
} else {
Ok(a.iter().zip(b.iter()).map(|(x, y)| x + y).collect())
}
}
pub async fn parallel_sum(&self, data: &[f32]) -> Result<f32> {
let mut stats = self.stats.write().await;
stats.aggregation_operations += 1;
if self.is_available() {
Ok(data.iter().sum())
} else {
Ok(data.iter().sum())
}
}
pub async fn pattern_match(&self, data: &[f32], pattern: &[f32]) -> Result<Vec<usize>> {
let mut stats = self.stats.write().await;
stats.pattern_operations += 1;
let mut matches = Vec::new();
for i in 0..=data.len().saturating_sub(pattern.len()) {
let window = &data[i..i + pattern.len()];
if window == pattern {
matches.push(i);
}
}
Ok(matches)
}
pub async fn stats(&self) -> GpuStats {
self.stats.read().await.clone()
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum GpuBackend {
Cuda,
Metal,
Cpu,
Auto,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GpuConfig {
pub enabled: bool,
pub backend: GpuBackend,
pub batch_size: usize,
pub memory_limit: usize,
pub mixed_precision: bool,
pub num_streams: usize,
}
impl Default for GpuConfig {
fn default() -> Self {
Self {
enabled: true,
backend: GpuBackend::Auto,
batch_size: 1024,
memory_limit: 2 * 1024 * 1024 * 1024, mixed_precision: false,
num_streams: 2,
}
}
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct GpuStats {
pub batches_processed: u64,
pub gpu_operations: u64,
pub cpu_fallbacks: u64,
pub matrix_operations: u64,
pub vector_operations: u64,
pub aggregation_operations: u64,
pub pattern_operations: u64,
pub total_gpu_time_ms: f64,
pub avg_gpu_time_ms: f64,
}
impl GpuStats {
pub fn gpu_utilization(&self) -> f64 {
let total_ops = self.gpu_operations + self.cpu_fallbacks;
if total_ops == 0 {
0.0
} else {
self.gpu_operations as f64 / total_ops as f64
}
}
pub fn cpu_fallback_rate(&self) -> f64 {
let total_ops = self.gpu_operations + self.cpu_fallbacks;
if total_ops == 0 {
0.0
} else {
self.cpu_fallbacks as f64 / total_ops as f64
}
}
}
pub struct GpuBuffer<T> {
data: Vec<T>,
device_ptr: Option<usize>, }
impl<T: Clone> GpuBuffer<T> {
pub fn new(data: Vec<T>) -> Self {
Self {
data,
device_ptr: None,
}
}
pub fn to_device(&mut self) -> Result<()> {
self.device_ptr = Some(0x1000); Ok(())
}
pub fn from_device(&mut self) -> Result<()> {
self.device_ptr = None;
Ok(())
}
pub fn is_on_device(&self) -> bool {
self.device_ptr.is_some()
}
pub fn data(&self) -> &[T] {
&self.data
}
}
pub struct GpuStreamProcessor {
context: GpuContext,
config: GpuProcessorConfig,
}
impl GpuStreamProcessor {
pub fn new(backend: GpuBackend, config: GpuProcessorConfig) -> Result<Self> {
Ok(Self {
context: GpuContext::new(backend)?,
config,
})
}
pub async fn process_batch(&self, batch: &[f32]) -> Result<Vec<f32>> {
if batch.len() < self.config.min_batch_size {
return Ok(batch.to_vec());
}
self.context.batch_process(batch).await
}
pub async fn compute_embeddings(&self, inputs: &[f32], weights: &[f32]) -> Result<Vec<f32>> {
let dim = weights.len() / inputs.len();
self.context
.matrix_multiply(inputs, weights, 1, inputs.len(), dim)
.await
}
pub async fn aggregate_metrics(&self, values: &[f32], operation: AggregationOp) -> Result<f32> {
match operation {
AggregationOp::Sum => self.context.parallel_sum(values).await,
AggregationOp::Mean => {
let sum = self.context.parallel_sum(values).await?;
Ok(sum / values.len() as f32)
}
AggregationOp::Max => Ok(values.iter().fold(f32::NEG_INFINITY, |a, &b| a.max(b))),
AggregationOp::Min => Ok(values.iter().fold(f32::INFINITY, |a, &b| a.min(b))),
}
}
pub fn is_gpu_available(&self) -> bool {
self.context.is_available()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GpuProcessorConfig {
pub min_batch_size: usize,
pub max_batch_size: usize,
pub async_processing: bool,
}
impl Default for GpuProcessorConfig {
fn default() -> Self {
Self {
min_batch_size: 100,
max_batch_size: 10000,
async_processing: true,
}
}
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub enum AggregationOp {
Sum,
Mean,
Max,
Min,
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_gpu_context_creation() {
let ctx = GpuContext::new(GpuBackend::Cpu).unwrap();
assert_eq!(ctx.backend(), GpuBackend::Cpu);
}
#[tokio::test]
async fn test_batch_processing() {
let ctx = GpuContext::new(GpuBackend::Cpu).unwrap();
let data = vec![1.0, 2.0, 3.0, 4.0, 5.0];
let result = ctx.batch_process(&data).await.unwrap();
assert_eq!(result, data);
}
#[tokio::test]
async fn test_matrix_multiply() {
let ctx = GpuContext::new(GpuBackend::Cpu).unwrap();
let a = vec![1.0, 2.0, 3.0, 4.0];
let b = vec![5.0, 6.0, 7.0, 8.0];
let result = ctx.matrix_multiply(&a, &b, 2, 2, 2).await.unwrap();
assert_eq!(result.len(), 4);
}
#[tokio::test]
async fn test_vector_add() {
let ctx = GpuContext::new(GpuBackend::Cpu).unwrap();
let a = vec![1.0, 2.0, 3.0];
let b = vec![4.0, 5.0, 6.0];
let result = ctx.vector_add(&a, &b).await.unwrap();
assert_eq!(result, vec![5.0, 7.0, 9.0]);
}
#[tokio::test]
async fn test_parallel_sum() {
let ctx = GpuContext::new(GpuBackend::Cpu).unwrap();
let data = vec![1.0, 2.0, 3.0, 4.0, 5.0];
let sum = ctx.parallel_sum(&data).await.unwrap();
assert_eq!(sum, 15.0);
}
#[tokio::test]
async fn test_pattern_match() {
let ctx = GpuContext::new(GpuBackend::Cpu).unwrap();
let data = vec![1.0, 2.0, 3.0, 2.0, 3.0, 4.0];
let pattern = vec![2.0, 3.0];
let matches = ctx.pattern_match(&data, &pattern).await.unwrap();
assert_eq!(matches, vec![1, 3]);
}
#[tokio::test]
async fn test_gpu_buffer() {
let mut buffer = GpuBuffer::new(vec![1.0, 2.0, 3.0]);
assert!(!buffer.is_on_device());
buffer.to_device().unwrap();
assert!(buffer.is_on_device());
buffer.from_device().unwrap();
assert!(!buffer.is_on_device());
}
#[tokio::test]
async fn test_stream_processor() {
let processor =
GpuStreamProcessor::new(GpuBackend::Cpu, GpuProcessorConfig::default()).unwrap();
let batch = vec![1.0, 2.0, 3.0, 4.0, 5.0];
let result = processor.process_batch(&batch).await.unwrap();
assert_eq!(result.len(), batch.len());
}
#[tokio::test]
async fn test_aggregation_operations() {
let processor =
GpuStreamProcessor::new(GpuBackend::Cpu, GpuProcessorConfig::default()).unwrap();
let values = vec![1.0, 2.0, 3.0, 4.0, 5.0];
let sum = processor
.aggregate_metrics(&values, AggregationOp::Sum)
.await
.unwrap();
assert_eq!(sum, 15.0);
let mean = processor
.aggregate_metrics(&values, AggregationOp::Mean)
.await
.unwrap();
assert_eq!(mean, 3.0);
let max = processor
.aggregate_metrics(&values, AggregationOp::Max)
.await
.unwrap();
assert_eq!(max, 5.0);
let min = processor
.aggregate_metrics(&values, AggregationOp::Min)
.await
.unwrap();
assert_eq!(min, 1.0);
}
#[tokio::test]
async fn test_gpu_stats() {
let ctx = GpuContext::new(GpuBackend::Cpu).unwrap();
let _ = ctx.batch_process(&[1.0, 2.0, 3.0]).await;
let _ = ctx.vector_add(&[1.0], &[2.0]).await;
let stats = ctx.stats().await;
assert!(stats.batches_processed > 0);
assert!(stats.vector_operations > 0);
}
}