use super::backend_management::GpuIoProcessor;
use crate::compression::{CompressionAlgorithm, ParallelCompressionConfig};
use crate::error::{IoError, Result};
use scirs2_core::gpu::{GpuBackend, GpuDataType};
use scirs2_core::ndarray::{Array1, ArrayView1};
use scirs2_core::simd_ops::PlatformCapabilities;
#[derive(Debug)]
pub struct GpuCompressionProcessor {
gpu_processor: GpuIoProcessor,
compression_threshold: usize,
}
impl GpuCompressionProcessor {
pub fn new() -> Result<Self> {
let gpu = GpuIoProcessor::new().unwrap_or_default();
Ok(Self {
gpu_processor: gpu,
compression_threshold: 10 * 1024 * 1024, })
}
pub fn with_threshold(threshold: usize) -> Result<Self> {
let gpu = GpuIoProcessor::new().unwrap_or_default();
Ok(Self {
gpu_processor: gpu,
compression_threshold: threshold,
})
}
pub fn compress_gpu<T: GpuDataType>(
&self,
data: &ArrayView1<T>,
algorithm: CompressionAlgorithm,
level: Option<u32>,
) -> Result<Vec<u8>> {
let data_bytes = data.len() * std::mem::size_of::<T>();
let use_gpu = self.should_use_gpu(data_bytes);
if use_gpu {
match self.gpu_processor.backend() {
GpuBackend::Cuda => self.compress_cuda(data, algorithm, level),
GpuBackend::Metal => self.compress_metal(data, algorithm, level),
GpuBackend::OpenCL => self.compress_opencl(data, algorithm, level),
_ => {
Err(IoError::Other(format!(
"GPU backend {} not supported for compression",
self.gpu_processor.backend()
)))
}
}
} else {
Err(IoError::Other(
"Data size too small for GPU acceleration".to_string(),
))
}
}
pub fn decompress_gpu<T: GpuDataType>(
&self,
compressed_data: &[u8],
algorithm: CompressionAlgorithm,
expected_size: usize,
) -> Result<Array1<T>> {
let use_gpu = self.should_use_gpu(expected_size);
if use_gpu {
match self.gpu_processor.backend() {
GpuBackend::Cuda => self.decompress_cuda(compressed_data, algorithm, expected_size),
GpuBackend::Metal => {
self.decompress_metal(compressed_data, algorithm, expected_size)
}
GpuBackend::OpenCL => {
self.decompress_opencl(compressed_data, algorithm, expected_size)
}
_ => Err(IoError::Other(format!(
"GPU backend {} not supported for decompression",
self.gpu_processor.backend()
))),
}
} else {
Err(IoError::Other(
"Data size too small for GPU acceleration".to_string(),
))
}
}
pub(crate) fn should_use_gpu(&self, size: usize) -> bool {
size > self.compression_threshold
}
fn compress_cuda<T: GpuDataType>(
&self,
data: &ArrayView1<T>,
algorithm: CompressionAlgorithm,
level: Option<u32>,
) -> Result<Vec<u8>> {
let capabilities = &self.gpu_processor.capabilities;
let data_bytes = unsafe {
std::slice::from_raw_parts(
data.as_ptr() as *const u8,
data.len() * std::mem::size_of::<T>(),
)
};
let chunk_size = if capabilities.simd_available {
1024 * 1024 } else {
512 * 1024 };
let compressed_chunks: Result<Vec<Vec<u8>>> = data_bytes
.chunks(chunk_size)
.enumerate()
.map(|(i, chunk)| match algorithm {
CompressionAlgorithm::Gzip => {
let compression_level = level.unwrap_or(6).clamp(1, 9) as u8;
oxiarc_deflate::gzip_compress(chunk, compression_level)
.map_err(|e| IoError::Other(format!("Gzip compression error: {}", e)))
}
CompressionAlgorithm::Zstd => {
let compression_level = level.unwrap_or(3).clamp(1, 19);
oxiarc_zstd::compress_with_level(chunk, compression_level as i32)
.map_err(|e| IoError::Other(format!("Zstd compression error: {}", e)))
}
CompressionAlgorithm::Lz4 => oxiarc_lz4::compress_block(chunk)
.map_err(|e| IoError::Other(format!("LZ4 compression error: {}", e))),
_ => Err(IoError::UnsupportedFormat(format!(
"Compression algorithm {:?} not supported for CUDA",
algorithm
))),
})
.collect();
let chunks = compressed_chunks?;
let mut result = Vec::new();
result.extend_from_slice(b"CUDA"); result.extend_from_slice(&1u32.to_le_bytes()); result.extend_from_slice(&(chunks.len() as u32).to_le_bytes());
for chunk in &chunks {
result.extend_from_slice(&(chunk.len() as u32).to_le_bytes());
}
for chunk in chunks {
result.extend_from_slice(&chunk);
}
Ok(result)
}
fn compress_metal<T: GpuDataType>(
&self,
data: &ArrayView1<T>,
algorithm: CompressionAlgorithm,
level: Option<u32>,
) -> Result<Vec<u8>> {
let capabilities = &self.gpu_processor.capabilities;
let data_bytes = unsafe {
std::slice::from_raw_parts(
data.as_ptr() as *const u8,
data.len() * std::mem::size_of::<T>(),
)
};
let chunk_size = if capabilities.simd_available {
768 * 1024 } else {
384 * 1024
};
let compressed_chunks: Result<Vec<Vec<u8>>> = data_bytes
.chunks(chunk_size)
.map(|chunk| match algorithm {
CompressionAlgorithm::Gzip => {
let compression_level = level.unwrap_or(6).clamp(1, 9) as u8;
oxiarc_deflate::gzip_compress(chunk, compression_level)
.map_err(|e| IoError::Other(format!("Gzip compression error: {}", e)))
}
CompressionAlgorithm::Zstd => {
let compression_level = level.unwrap_or(4).clamp(1, 19);
oxiarc_zstd::compress_with_level(chunk, compression_level as i32)
.map_err(|e| IoError::Other(format!("Zstd compression error: {}", e)))
}
CompressionAlgorithm::Lz4 => oxiarc_lz4::compress_block(chunk)
.map_err(|e| IoError::Other(format!("LZ4 compression error: {}", e))),
_ => Err(IoError::UnsupportedFormat(format!(
"Compression algorithm {:?} not supported for Metal",
algorithm
))),
})
.collect();
let chunks = compressed_chunks?;
let mut result = Vec::new();
result.extend_from_slice(b"METL"); result.extend_from_slice(&1u32.to_le_bytes()); result.extend_from_slice(&(chunks.len() as u32).to_le_bytes());
for chunk in &chunks {
result.extend_from_slice(&(chunk.len() as u32).to_le_bytes());
}
for chunk in chunks {
result.extend_from_slice(&chunk);
}
Ok(result)
}
fn compress_opencl<T: GpuDataType>(
&self,
data: &ArrayView1<T>,
algorithm: CompressionAlgorithm,
level: Option<u32>,
) -> Result<Vec<u8>> {
let capabilities = &self.gpu_processor.capabilities;
let data_bytes = unsafe {
std::slice::from_raw_parts(
data.as_ptr() as *const u8,
data.len() * std::mem::size_of::<T>(),
)
};
let chunk_size = if capabilities.simd_available {
512 * 1024 } else {
256 * 1024
};
let compressed_chunks: Result<Vec<Vec<u8>>> = data_bytes
.chunks(chunk_size)
.map(|chunk| match algorithm {
CompressionAlgorithm::Gzip => {
let compression_level = level.unwrap_or(6).clamp(1, 9) as u8;
oxiarc_deflate::gzip_compress(chunk, compression_level)
.map_err(|e| IoError::Other(format!("Gzip compression error: {}", e)))
}
CompressionAlgorithm::Zstd => {
let compression_level = level.unwrap_or(5).clamp(1, 19);
oxiarc_zstd::compress_with_level(chunk, compression_level as i32)
.map_err(|e| IoError::Other(format!("Zstd compression error: {}", e)))
}
CompressionAlgorithm::Lz4 => oxiarc_lz4::compress_block(chunk)
.map_err(|e| IoError::Other(format!("LZ4 compression error: {}", e))),
_ => Err(IoError::UnsupportedFormat(format!(
"Compression algorithm {:?} not supported for OpenCL",
algorithm
))),
})
.collect();
let chunks = compressed_chunks?;
let mut result = Vec::new();
result.extend_from_slice(b"OPCL"); result.extend_from_slice(&1u32.to_le_bytes()); result.extend_from_slice(&32u32.to_le_bytes()); result.extend_from_slice(&(chunks.len() as u32).to_le_bytes());
for (i, chunk) in chunks.iter().enumerate() {
result.extend_from_slice(&(i as u32).to_le_bytes()); result.extend_from_slice(&(chunk.len() as u32).to_le_bytes()); }
for chunk in chunks {
result.extend_from_slice(&chunk);
}
Ok(result)
}
fn decompress_cuda<T: GpuDataType>(
&self,
data: &[u8],
algorithm: CompressionAlgorithm,
expected_size: usize,
) -> Result<Array1<T>> {
if data.len() < 12 || &data[0..4] != b"CUDA" {
return Err(IoError::Other(
"Invalid CUDA compressed data format".to_string(),
));
}
let version = u32::from_le_bytes([data[4], data[5], data[6], data[7]]);
if version != 1 {
return Err(IoError::Other(
"Unsupported CUDA compression version".to_string(),
));
}
let num_chunks = u32::from_le_bytes([data[8], data[9], data[10], data[11]]) as usize;
let mut offset = 12;
let mut chunk_sizes = Vec::with_capacity(num_chunks);
for _ in 0..num_chunks {
if offset + 4 > data.len() {
return Err(IoError::Other("Invalid compressed data format".to_string()));
}
let size = u32::from_le_bytes([
data[offset],
data[offset + 1],
data[offset + 2],
data[offset + 3],
]) as usize;
chunk_sizes.push(size);
offset += 4;
}
self.decompress_chunks_parallel(data, offset, &chunk_sizes, algorithm)
}
fn decompress_metal<T: GpuDataType>(
&self,
data: &[u8],
algorithm: CompressionAlgorithm,
expected_size: usize,
) -> Result<Array1<T>> {
if data.len() < 12 || &data[0..4] != b"METL" {
return self.decompress_cuda(data, algorithm, expected_size);
}
let version = u32::from_le_bytes([data[4], data[5], data[6], data[7]]);
if version != 1 {
return Err(IoError::Other(
"Unsupported Metal compression version".to_string(),
));
}
let num_chunks = u32::from_le_bytes([data[8], data[9], data[10], data[11]]) as usize;
let mut offset = 12;
let mut chunk_sizes = Vec::with_capacity(num_chunks);
for _ in 0..num_chunks {
if offset + 4 > data.len() {
return Err(IoError::Other(
"Invalid Metal compressed data format".to_string(),
));
}
let size = u32::from_le_bytes([
data[offset],
data[offset + 1],
data[offset + 2],
data[offset + 3],
]) as usize;
chunk_sizes.push(size);
offset += 4;
}
self.decompress_chunks_parallel(data, offset, &chunk_sizes, algorithm)
}
fn decompress_opencl<T: GpuDataType>(
&self,
data: &[u8],
algorithm: CompressionAlgorithm,
expected_size: usize,
) -> Result<Array1<T>> {
if data.len() < 16 || &data[0..4] != b"OPCL" {
return self.decompress_cuda(data, algorithm, expected_size);
}
let version = u32::from_le_bytes([data[4], data[5], data[6], data[7]]);
if version != 1 {
return Err(IoError::Other(
"Unsupported OpenCL compression version".to_string(),
));
}
let compute_units = u32::from_le_bytes([data[8], data[9], data[10], data[11]]);
let num_chunks = u32::from_le_bytes([data[12], data[13], data[14], data[15]]) as usize;
let mut offset = 16;
let mut chunk_sizes = Vec::with_capacity(num_chunks);
for _ in 0..num_chunks {
if offset + 8 > data.len() {
return Err(IoError::Other(
"Invalid OpenCL compressed data format".to_string(),
));
}
let _chunk_index = u32::from_le_bytes([
data[offset],
data[offset + 1],
data[offset + 2],
data[offset + 3],
]);
let chunk_size = u32::from_le_bytes([
data[offset + 4],
data[offset + 5],
data[offset + 6],
data[offset + 7],
]) as usize;
chunk_sizes.push(chunk_size);
offset += 8;
}
self.decompress_chunks_parallel(data, offset, &chunk_sizes, algorithm)
}
fn decompress_chunks_parallel<T: GpuDataType>(
&self,
data: &[u8],
mut offset: usize,
chunk_sizes: &[usize],
algorithm: CompressionAlgorithm,
) -> Result<Array1<T>> {
use scirs2_core::parallel_ops::*;
let mut chunk_data = Vec::new();
for &size in chunk_sizes {
if offset + size > data.len() {
return Err(IoError::Other("Invalid compressed data format".to_string()));
}
chunk_data.push(&data[offset..offset + size]);
offset += size;
}
let decompressed_chunks: Result<Vec<Vec<u8>>> = chunk_data
.par_iter()
.map(|chunk| match algorithm {
CompressionAlgorithm::Gzip => oxiarc_deflate::gzip_decompress(chunk)
.map_err(|e| IoError::Other(format!("Gzip decompression error: {}", e))),
CompressionAlgorithm::Zstd => oxiarc_zstd::decompress(chunk)
.map_err(|e| IoError::Other(format!("Zstd decompression error: {}", e))),
CompressionAlgorithm::Lz4 => {
let max_size = chunk.len() * 10;
oxiarc_lz4::decompress_block(chunk, max_size)
.map_err(|e| IoError::Other(format!("LZ4 decompression error: {}", e)))
}
_ => Err(IoError::UnsupportedFormat(format!(
"Compression algorithm {:?} not supported for GPU decompression",
algorithm
))),
})
.collect();
let chunks = decompressed_chunks?;
let mut combined_data = Vec::new();
for chunk in chunks {
combined_data.extend_from_slice(&chunk);
}
let element_size = std::mem::size_of::<T>();
if combined_data.len() % element_size != 0 {
return Err(IoError::Other(
"Decompressed data size mismatch".to_string(),
));
}
let num_elements = combined_data.len() / element_size;
let typed_data = unsafe {
std::slice::from_raw_parts(combined_data.as_ptr() as *const T, num_elements).to_vec()
};
Ok(Array1::from_vec(typed_data))
}
pub fn get_performance_stats(&self) -> CompressionStats {
let capabilities = self
.gpu_processor
.get_backend_capabilities()
.unwrap_or_else(|_| {
use super::backend_management::BackendCapabilities;
BackendCapabilities {
backend: scirs2_core::gpu::GpuBackend::Cpu,
memory_gb: 1.0,
max_work_group_size: 64,
supports_fp64: false,
supports_fp16: false,
compute_units: 1,
max_allocation_size: 1024 * 1024,
local_memory_size: 64 * 1024,
}
});
CompressionStats {
backend: capabilities.backend,
threshold_bytes: self.compression_threshold,
estimated_throughput_gbps: capabilities.estimate_memory_bandwidth(),
parallel_chunks: capabilities.compute_units,
}
}
}
impl Default for GpuCompressionProcessor {
fn default() -> Self {
Self::new().unwrap_or_else(|_| {
Self {
gpu_processor: GpuIoProcessor::default(),
compression_threshold: 10 * 1024 * 1024,
}
})
}
}
#[derive(Debug, Clone)]
pub struct CompressionStats {
pub backend: scirs2_core::gpu::GpuBackend,
pub threshold_bytes: usize,
pub estimated_throughput_gbps: f64,
pub parallel_chunks: usize,
}
#[cfg(test)]
mod tests {
use super::*;
use scirs2_core::ndarray::arr1;
#[test]
fn test_compression_processor_creation() {
let processor = GpuCompressionProcessor::default();
assert!(processor.compression_threshold > 0);
}
#[test]
fn test_compression_threshold() {
let processor = GpuCompressionProcessor::with_threshold(1024).unwrap_or_default();
assert!(!processor.should_use_gpu(512)); assert!(processor.should_use_gpu(2048)); }
#[test]
fn test_compression_stats() {
let processor = GpuCompressionProcessor::default();
let stats = processor.get_performance_stats();
assert!(stats.threshold_bytes > 0);
assert!(stats.parallel_chunks > 0);
}
#[test]
fn test_small_data_compression() {
let processor = GpuCompressionProcessor::default();
let small_data = arr1(&[1.0f32, 2.0, 3.0, 4.0]);
let result = processor.compress_gpu(&small_data.view(), CompressionAlgorithm::Lz4, None);
assert!(result.is_err());
}
}