use alloc::collections::BTreeMap;
use alloc::vec::Vec;
use lazy_static::lazy_static;
use spin::Mutex;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum GpuBackend {
Cpu,
Cuda,
Rocm,
Vulkan,
}
impl GpuBackend {
pub fn name(&self) -> &'static str {
match self {
GpuBackend::Cpu => "CPU (fallback)",
GpuBackend::Cuda => "NVIDIA CUDA",
GpuBackend::Rocm => "AMD ROCm",
GpuBackend::Vulkan => "Vulkan Compute",
}
}
pub fn is_gpu(&self) -> bool {
!matches!(self, GpuBackend::Cpu)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum GpuCompressAlgo {
Lz4,
Zstd,
}
#[derive(Debug, Clone)]
pub struct GpuCapabilities {
pub device_id: u32,
pub name: &'static str,
pub backend: GpuBackend,
pub compute_units: u32,
pub memory_bandwidth: u32,
pub max_buffer_size: u64,
pub supports_lz4: bool,
pub supports_zstd: bool,
}
impl GpuCapabilities {
pub fn new(
device_id: u32,
name: &'static str,
compute_units: u32,
memory_bandwidth: u32,
) -> Self {
Self {
device_id,
name,
backend: GpuBackend::Cpu, compute_units,
memory_bandwidth,
max_buffer_size: 16 * 1024 * 1024 * 1024, supports_lz4: true,
supports_zstd: true,
}
}
#[cfg(feature = "gpu-compute")]
pub fn from_cuda(
device_id: u32,
name: &'static str,
compute_units: u32,
memory_bandwidth: u32,
max_buffer_size: u64,
) -> Self {
Self {
device_id,
name,
backend: GpuBackend::Cuda,
compute_units,
memory_bandwidth,
max_buffer_size,
supports_lz4: true,
supports_zstd: true,
}
}
pub fn supports_algo(&self, algo: GpuCompressAlgo) -> bool {
match algo {
GpuCompressAlgo::Lz4 => self.supports_lz4,
GpuCompressAlgo::Zstd => self.supports_zstd,
}
}
pub fn is_real_gpu(&self) -> bool {
self.backend.is_gpu()
}
}
#[derive(Debug, Clone)]
pub struct GpuCompressCommand {
pub cmd_id: u64,
pub algo: GpuCompressAlgo,
pub input_size: u64,
pub output_size: u64,
pub level: u8,
pub submitted: u64,
}
#[derive(Debug, Clone)]
pub struct GpuCompressResult {
pub cmd_id: u64,
pub compressed_size: u64,
pub ratio: f32,
pub gpu_time_us: u64,
pub total_time_us: u64,
}
impl GpuCompressResult {
pub fn throughput_gbps(&self, input_size: u64) -> f32 {
if self.total_time_us == 0 {
return 0.0;
}
(input_size as f64 / (self.total_time_us as f64 / 1_000_000.0) / 1e9) as f32
}
pub fn speedup_vs_cpu(&self, input_size: u64, algo: GpuCompressAlgo) -> f32 {
let gpu_throughput = self.throughput_gbps(input_size) * 1000.0; let cpu_throughput = match algo {
GpuCompressAlgo::Lz4 => 500.0, GpuCompressAlgo::Zstd => 200.0, };
gpu_throughput / cpu_throughput
}
}
#[derive(Debug, Clone, Default)]
pub struct GpuCompressStats {
pub total_ops: u64,
pub gpu_ops: u64,
pub cpu_fallback: u64,
pub total_bytes: u64,
pub compressed_bytes: u64,
pub total_gpu_time_us: u64,
pub total_time_us: u64,
}
impl GpuCompressStats {
pub fn offload_ratio(&self) -> f32 {
if self.total_ops == 0 {
return 0.0;
}
self.gpu_ops as f32 / self.total_ops as f32
}
pub fn avg_throughput_gbps(&self) -> f32 {
if self.total_time_us == 0 {
return 0.0;
}
(self.total_bytes as f64 / (self.total_time_us as f64 / 1_000_000.0) / 1e9) as f32
}
pub fn avg_compression_ratio(&self) -> f32 {
if self.compressed_bytes == 0 {
return 1.0;
}
self.total_bytes as f32 / self.compressed_bytes as f32
}
pub fn avg_speedup(&self) -> f32 {
let cpu_throughput = 0.7 * 500.0 + 0.3 * 200.0; let gpu_throughput = self.avg_throughput_gbps() * 1000.0; if cpu_throughput == 0.0 {
return 1.0;
}
gpu_throughput / cpu_throughput
}
}
pub struct GpuCompressManager {
capabilities: Option<GpuCapabilities>,
pending: BTreeMap<u64, GpuCompressCommand>,
next_cmd_id: u64,
stats: GpuCompressStats,
}
impl Default for GpuCompressManager {
fn default() -> Self {
Self::new()
}
}
impl GpuCompressManager {
pub fn new() -> Self {
Self {
capabilities: None,
pending: BTreeMap::new(),
next_cmd_id: 1,
stats: GpuCompressStats::default(),
}
}
pub fn register_device(&mut self, caps: GpuCapabilities) {
self.capabilities = Some(caps);
}
pub fn is_available(&self) -> bool {
self.capabilities.is_some()
}
pub fn submit(
&mut self,
algo: GpuCompressAlgo,
input_size: u64,
level: u8,
timestamp: u64,
) -> Option<u64> {
if let Some(caps) = &self.capabilities {
if !caps.supports_algo(algo) {
self.stats.cpu_fallback += 1;
return None;
}
if input_size > caps.max_buffer_size {
self.stats.cpu_fallback += 1;
return None;
}
let cmd_id = self.next_cmd_id;
self.next_cmd_id += 1;
let cmd = GpuCompressCommand {
cmd_id,
algo,
input_size,
output_size: input_size, level,
submitted: timestamp,
};
self.pending.insert(cmd_id, cmd);
self.stats.gpu_ops += 1;
self.stats.total_ops += 1;
Some(cmd_id)
} else {
self.stats.cpu_fallback += 1;
self.stats.total_ops += 1;
None
}
}
pub fn complete(
&mut self,
cmd_id: u64,
compressed_size: u64,
current_time: u64,
) -> Option<GpuCompressResult> {
if let Some(cmd) = self.pending.remove(&cmd_id) {
let cpu_time = match cmd.algo {
GpuCompressAlgo::Lz4 => cmd.input_size / 500_000, GpuCompressAlgo::Zstd => cmd.input_size / 200_000, };
let gpu_time = cpu_time / 10; let transfer_overhead = cmd.input_size / 50_000_000; let total_time = gpu_time + transfer_overhead * 2;
let result = GpuCompressResult {
cmd_id,
compressed_size,
ratio: cmd.input_size as f32 / compressed_size as f32,
gpu_time_us: gpu_time,
total_time_us: total_time.max(1),
};
self.stats.total_bytes += cmd.input_size;
self.stats.compressed_bytes += compressed_size;
self.stats.total_gpu_time_us += gpu_time;
self.stats.total_time_us += total_time;
Some(result)
} else {
None
}
}
pub fn stats(&self) -> GpuCompressStats {
self.stats.clone()
}
pub fn capabilities(&self) -> Option<&GpuCapabilities> {
self.capabilities.as_ref()
}
}
lazy_static! {
static ref GPU_COMPRESS_ENGINE: Mutex<GpuCompressManager> =
Mutex::new(GpuCompressManager::new());
}
pub struct GpuCompressEngine;
impl GpuCompressEngine {
pub fn register_device(caps: GpuCapabilities) {
let mut engine = GPU_COMPRESS_ENGINE.lock();
engine.register_device(caps);
}
pub fn is_available() -> bool {
let engine = GPU_COMPRESS_ENGINE.lock();
engine.is_available()
}
pub fn submit(
algo: GpuCompressAlgo,
input_size: u64,
level: u8,
timestamp: u64,
) -> Option<u64> {
let mut engine = GPU_COMPRESS_ENGINE.lock();
engine.submit(algo, input_size, level, timestamp)
}
pub fn complete(
cmd_id: u64,
compressed_size: u64,
current_time: u64,
) -> Option<GpuCompressResult> {
let mut engine = GPU_COMPRESS_ENGINE.lock();
engine.complete(cmd_id, compressed_size, current_time)
}
pub fn stats() -> GpuCompressStats {
let engine = GPU_COMPRESS_ENGINE.lock();
engine.stats()
}
pub fn capabilities() -> Option<GpuCapabilities> {
let engine = GPU_COMPRESS_ENGINE.lock();
engine.capabilities().cloned()
}
}
pub fn create_nvidia_rtx4090() -> GpuCapabilities {
GpuCapabilities::new(
0,
"NVIDIA RTX 4090",
128, 1008, )
}
pub fn create_amd_mi300x() -> GpuCapabilities {
GpuCapabilities::new(
1,
"AMD MI300X",
304, 5300, )
}
pub trait GpuComputeProvider: Send + Sync {
fn allocate(&self, size: usize) -> Option<u64>;
fn free(&self, device_addr: u64);
fn copy_to_device(&self, device_addr: u64, data: &[u8]) -> bool;
fn copy_from_device(&self, device_addr: u64, data: &mut [u8]) -> bool;
fn compress_lz4(
&self,
input_addr: u64,
input_size: usize,
output_addr: u64,
output_size: usize,
) -> usize;
fn compress_zstd(
&self,
input_addr: u64,
input_size: usize,
output_addr: u64,
output_size: usize,
level: u8,
) -> usize;
fn synchronize(&self);
}
#[cfg(feature = "gpu-compute")]
static GPU_COMPUTE_PROVIDER: spin::Once<&'static dyn GpuComputeProvider> = spin::Once::new();
#[cfg(feature = "gpu-compute")]
pub fn register_compute_provider(provider: &'static dyn GpuComputeProvider) {
GPU_COMPUTE_PROVIDER.call_once(|| provider);
}
#[cfg(feature = "gpu-compute")]
pub fn get_compute_provider() -> Option<&'static dyn GpuComputeProvider> {
GPU_COMPUTE_PROVIDER.get().copied()
}
#[cfg(feature = "gpu-compute")]
pub fn gpu_compress(algo: GpuCompressAlgo, input: &[u8], level: u8) -> Option<Vec<u8>> {
let provider = get_compute_provider()?;
let input_addr = provider.allocate(input.len())?;
let output_size = input.len() + 1024; let output_addr = provider.allocate(output_size)?;
if !provider.copy_to_device(input_addr, input) {
provider.free(input_addr);
provider.free(output_addr);
return None;
}
let compressed_size = match algo {
GpuCompressAlgo::Lz4 => {
provider.compress_lz4(input_addr, input.len(), output_addr, output_size)
}
GpuCompressAlgo::Zstd => {
provider.compress_zstd(input_addr, input.len(), output_addr, output_size, level)
}
};
if compressed_size == 0 {
provider.free(input_addr);
provider.free(output_addr);
return None;
}
let mut output = alloc::vec![0u8; compressed_size];
if !provider.copy_from_device(output_addr, &mut output) {
provider.free(input_addr);
provider.free(output_addr);
return None;
}
provider.free(input_addr);
provider.free(output_addr);
Some(output)
}
pub fn compress(algo: GpuCompressAlgo, input: &[u8], level: u8) -> Vec<u8> {
#[cfg(feature = "gpu-compute")]
if let Some(result) = gpu_compress(algo, input, level) {
return result;
}
match algo {
GpuCompressAlgo::Lz4 => lz4_flex::compress_prepend_size(input),
GpuCompressAlgo::Zstd => {
#[cfg(feature = "std")]
{
let mut encoder = zstd::stream::Encoder::new(Vec::new(), level as i32).unwrap();
std::io::copy(&mut std::io::Cursor::new(input), &mut encoder).unwrap();
encoder.finish().unwrap()
}
#[cfg(not(feature = "std"))]
{
lz4_flex::compress_prepend_size(input)
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_gpu_capabilities() {
let caps = create_nvidia_rtx4090();
assert_eq!(caps.name, "NVIDIA RTX 4090");
assert_eq!(caps.compute_units, 128);
assert_eq!(caps.memory_bandwidth, 1008);
assert!(caps.supports_lz4);
assert!(caps.supports_zstd);
}
#[test]
fn test_device_registration() {
let mut mgr = GpuCompressManager::new();
assert!(!mgr.is_available());
mgr.register_device(create_nvidia_rtx4090());
assert!(mgr.is_available());
}
#[test]
fn test_submit_command() {
let mut mgr = GpuCompressManager::new();
mgr.register_device(create_nvidia_rtx4090());
let cmd_id = mgr.submit(GpuCompressAlgo::Lz4, 1_000_000, 0, 0);
assert!(cmd_id.is_some());
let stats = mgr.stats();
assert_eq!(stats.gpu_ops, 1);
assert_eq!(stats.total_ops, 1);
}
#[test]
fn test_cpu_fallback() {
let mut mgr = GpuCompressManager::new();
let cmd_id = mgr.submit(GpuCompressAlgo::Lz4, 1_000_000, 0, 0);
assert!(cmd_id.is_none());
let stats = mgr.stats();
assert_eq!(stats.cpu_fallback, 1);
assert_eq!(stats.gpu_ops, 0);
}
#[test]
fn test_complete_command() {
let mut mgr = GpuCompressManager::new();
mgr.register_device(create_nvidia_rtx4090());
let cmd_id = mgr
.submit(GpuCompressAlgo::Lz4, 10_000_000, 0, 0)
.expect("test: operation should succeed");
let result = mgr
.complete(cmd_id, 5_000_000, 100)
.expect("test: operation should succeed");
assert_eq!(result.cmd_id, cmd_id);
assert_eq!(result.compressed_size, 5_000_000);
assert_eq!(result.ratio, 2.0);
}
#[test]
fn test_throughput_calculation() {
let result = GpuCompressResult {
cmd_id: 1,
compressed_size: 5_000_000,
ratio: 2.0,
gpu_time_us: 1000,
total_time_us: 2000,
};
let throughput = result.throughput_gbps(10_000_000);
assert!((throughput - 5.0).abs() < 0.1);
}
#[test]
fn test_speedup_calculation() {
let result = GpuCompressResult {
cmd_id: 1,
compressed_size: 5_000_000,
ratio: 2.0,
gpu_time_us: 1000,
total_time_us: 2000,
};
let speedup = result.speedup_vs_cpu(10_000_000, GpuCompressAlgo::Lz4);
assert!((speedup - 10.0).abs() < 0.5);
}
#[test]
fn test_statistics() {
let mut mgr = GpuCompressManager::new();
mgr.register_device(create_nvidia_rtx4090());
for i in 0..10 {
let cmd_id = mgr
.submit(GpuCompressAlgo::Lz4, 1_000_000, 0, i)
.expect("test: operation should succeed");
mgr.complete(cmd_id, 500_000, i + 100);
}
let stats = mgr.stats();
assert_eq!(stats.total_ops, 10);
assert_eq!(stats.gpu_ops, 10);
assert_eq!(stats.total_bytes, 10_000_000);
assert_eq!(stats.compressed_bytes, 5_000_000);
assert_eq!(stats.avg_compression_ratio(), 2.0);
}
#[test]
fn test_offload_ratio() {
let mut mgr = GpuCompressManager::new();
for _ in 0..5 {
mgr.submit(GpuCompressAlgo::Lz4, 1_000_000, 0, 0);
}
mgr.register_device(create_nvidia_rtx4090());
for _ in 0..15 {
mgr.submit(GpuCompressAlgo::Lz4, 1_000_000, 0, 0);
}
let stats = mgr.stats();
assert_eq!(stats.total_ops, 20);
assert_eq!(stats.gpu_ops, 15);
assert_eq!(stats.cpu_fallback, 5);
assert_eq!(stats.offload_ratio(), 0.75);
}
#[test]
fn test_large_buffer_fallback() {
let mut mgr = GpuCompressManager::new();
let mut caps = create_nvidia_rtx4090();
caps.max_buffer_size = 1_000_000; mgr.register_device(caps);
let cmd_id = mgr.submit(GpuCompressAlgo::Lz4, 10_000_000, 0, 0);
assert!(cmd_id.is_none());
let stats = mgr.stats();
assert_eq!(stats.cpu_fallback, 1);
}
}