#![cfg(feature = "nvcomp-gpu")]
use std::ffi::{c_int, c_void};
use std::ptr::null_mut;
use std::sync::Mutex;
use super::algo::BitcompDataType;
use super::error::{Error, Result};
use super::nvcomp_sys::cuda::{
CUDA_SUCCESS, cudaError_t, cudaFree, cudaGetDeviceCount, cudaGetErrorString, cudaMalloc,
cudaMemcpyAsync, cudaMemcpyKind, cudaStream_t, cudaStreamCreate, cudaStreamDestroy,
cudaStreamSynchronize,
};
use super::nvcomp_sys::nvcomp::{
NVCOMP_TYPE_BFLOAT16, NVCOMP_TYPE_CHAR, NVCOMP_TYPE_DOUBLE, NVCOMP_TYPE_FLOAT, NVCOMP_TYPE_INT,
NVCOMP_TYPE_LONGLONG, NVCOMP_TYPE_SHORT, NVCOMP_TYPE_UCHAR, NVCOMP_TYPE_UINT,
NVCOMP_TYPE_ULONGLONG, NVCOMP_TYPE_USHORT, nvcompType_t,
};
use super::{Algo, Codec};
unsafe extern "C" {
fn ferro_nvcomp_hlif_create_bitcomp(
chunk_size: usize,
algorithm: c_int,
data_type: nvcompType_t,
user_stream: cudaStream_t,
out_handle: *mut *mut c_void,
) -> c_int;
fn ferro_nvcomp_hlif_create_zstd(
chunk_size: usize,
user_stream: cudaStream_t,
out_handle: *mut *mut c_void,
) -> c_int;
fn ferro_nvcomp_hlif_destroy(handle: *mut c_void);
fn ferro_nvcomp_hlif_max_compressed_size(
handle: *mut c_void,
uncomp_bytes: usize,
out_max_bytes: *mut usize,
) -> c_int;
fn ferro_nvcomp_hlif_compress(
handle: *mut c_void,
d_in: *const u8,
uncomp_bytes: usize,
d_out: *mut u8,
out_comp_bytes: *mut usize,
) -> c_int;
fn ferro_nvcomp_hlif_decompress(
handle: *mut c_void,
d_comp: *const u8,
d_out: *mut u8,
out_decomp_bytes: *mut usize,
) -> c_int;
fn ferro_nvcomp_hlif_get_decompressed_output_size(
handle: *mut c_void,
d_comp: *const u8,
out_decomp_bytes: *mut usize,
) -> c_int;
fn ferro_nvcomp_hlif_last_error_message(
buf: *mut std::ffi::c_char,
buf_capacity: usize,
) -> usize;
}
pub const DEFAULT_HLIF_CHUNK_SIZE: usize = 65_536;
fn pull_shim_error() -> String {
let mut buf = [0u8; 1024];
let written = unsafe {
ferro_nvcomp_hlif_last_error_message(buf.as_mut_ptr() as *mut std::ffi::c_char, buf.len())
};
let copied = written.min(buf.len() - 1);
let end = buf[..copied].iter().position(|&b| b == 0).unwrap_or(copied);
String::from_utf8_lossy(&buf[..end]).into_owned()
}
fn check_shim(rc: c_int, what: &'static str) -> Result<()> {
if rc == 0 {
return Ok(());
}
let msg = pull_shim_error();
Err(Error::Compress(format!(
"HLIF shim error in {what}: rc=0x{rc:x} msg={msg}"
)))
}
fn check_cuda(rc: cudaError_t, what: &'static str) -> Result<()> {
if rc == CUDA_SUCCESS {
return Ok(());
}
let msg = unsafe {
let s = cudaGetErrorString(rc);
if s.is_null() {
"unknown".to_string()
} else {
std::ffi::CStr::from_ptr(s).to_string_lossy().into_owned()
}
};
Err(Error::Compress(format!(
"CUDA error in {what}: code={rc} ({msg})"
)))
}
pub fn cuda_available() -> bool {
let mut count: c_int = 0;
let rc = unsafe { cudaGetDeviceCount(&mut count) };
rc == CUDA_SUCCESS && count > 0
}
fn bitcomp_to_nvcomp_type(dt: BitcompDataType) -> nvcompType_t {
match dt {
BitcompDataType::Char => NVCOMP_TYPE_CHAR,
BitcompDataType::Uint8 => NVCOMP_TYPE_UCHAR,
BitcompDataType::Uint16 => NVCOMP_TYPE_USHORT,
BitcompDataType::Uint32 => NVCOMP_TYPE_UINT,
BitcompDataType::Uint64 => NVCOMP_TYPE_ULONGLONG,
BitcompDataType::Int8 => NVCOMP_TYPE_CHAR,
BitcompDataType::Int16 => NVCOMP_TYPE_SHORT,
BitcompDataType::Int32 => NVCOMP_TYPE_INT,
BitcompDataType::Int64 => NVCOMP_TYPE_LONGLONG,
BitcompDataType::Float32 => NVCOMP_TYPE_FLOAT,
BitcompDataType::Float64 => NVCOMP_TYPE_DOUBLE,
BitcompDataType::BFloat16 => NVCOMP_TYPE_BFLOAT16,
}
}
struct HlifInner {
handle: *mut c_void,
stream: cudaStream_t,
d_in: *mut c_void,
d_in_cap: usize,
d_out: *mut c_void,
d_out_cap: usize,
d_decomp: *mut c_void,
d_decomp_cap: usize,
}
impl Drop for HlifInner {
fn drop(&mut self) {
if !self.handle.is_null() {
unsafe { ferro_nvcomp_hlif_destroy(self.handle) };
self.handle = null_mut();
}
for slot in [&mut self.d_in, &mut self.d_out, &mut self.d_decomp] {
if !slot.is_null() {
unsafe {
let _ = cudaFree(*slot);
}
*slot = null_mut();
}
}
if !self.stream.is_null() {
unsafe { cudaStreamDestroy(self.stream) };
self.stream = null_mut();
}
}
}
impl HlifInner {
fn new(handle: *mut c_void, stream: cudaStream_t) -> Self {
Self {
handle,
stream,
d_in: null_mut(),
d_in_cap: 0,
d_out: null_mut(),
d_out_cap: 0,
d_decomp: null_mut(),
d_decomp_cap: 0,
}
}
fn ensure_buf(slot: &mut *mut c_void, cap: &mut usize, needed: usize) -> Result<()> {
if needed == 0 {
return Ok(());
}
if *cap >= needed {
return Ok(());
}
if !slot.is_null() {
unsafe {
let _ = cudaFree(*slot);
}
*slot = null_mut();
*cap = 0;
}
let alloc_size = needed.div_ceil(1 << 20).max(1) << 20;
check_cuda(
unsafe { cudaMalloc(slot, alloc_size) },
"cudaMalloc(hlif pool)",
)?;
*cap = alloc_size;
Ok(())
}
}
unsafe impl Send for HlifInner {}
unsafe impl Sync for HlifInner {}
pub struct BitcompHlifBackend {
inner: Mutex<HlifInner>,
data_type: BitcompDataType,
chunk_size: usize,
}
impl std::fmt::Debug for BitcompHlifBackend {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("BitcompHlifBackend")
.field("data_type", &self.data_type)
.field("chunk_size", &self.chunk_size)
.finish()
}
}
impl BitcompHlifBackend {
pub fn new(data_type: BitcompDataType) -> Result<Self> {
Self::with_chunk_size(data_type, DEFAULT_HLIF_CHUNK_SIZE)
}
pub fn with_chunk_size(data_type: BitcompDataType, chunk_size: usize) -> Result<Self> {
if !cuda_available() {
return Err(Error::BackendUnavailable("no CUDA device available"));
}
let mut stream: cudaStream_t = null_mut();
check_cuda(
unsafe { cudaStreamCreate(&mut stream) },
"cudaStreamCreate(BitcompHlifBackend)",
)?;
let mut handle: *mut c_void = null_mut();
let rc = unsafe {
ferro_nvcomp_hlif_create_bitcomp(
chunk_size,
0,
bitcomp_to_nvcomp_type(data_type),
stream,
&mut handle,
)
};
if rc != 0 {
unsafe { cudaStreamDestroy(stream) };
return Err(Error::Compress(format!(
"ferro_nvcomp_hlif_create_bitcomp failed: rc=0x{rc:x} msg={}",
pull_shim_error()
)));
}
Ok(Self {
inner: Mutex::new(HlifInner::new(handle, stream)),
data_type,
chunk_size,
})
}
pub fn cuda_stream(&self) -> cudaStream_t {
self.inner
.lock()
.expect("BitcompHlifBackend inner poisoned")
.stream
}
}
impl Codec for BitcompHlifBackend {
fn algo(&self) -> Algo {
Algo::Bitcomp {
data_type: self.data_type,
}
}
fn compress(&self, input: &[u8], output: &mut Vec<u8>) -> Result<()> {
let mut inner = self
.inner
.lock()
.expect("BitcompHlifBackend inner poisoned");
compress_via_hlif(&mut inner, input, output)
}
fn decompress(&self, input: &[u8], output: &mut Vec<u8>) -> Result<()> {
let mut inner = self
.inner
.lock()
.expect("BitcompHlifBackend inner poisoned");
decompress_via_hlif(&mut inner, input, output)
}
fn max_compressed_len(&self, uncompressed_len: usize) -> usize {
let num_chunks = uncompressed_len.div_ceil(self.chunk_size).max(1);
let per_chunk_overhead = 64usize;
uncompressed_len + uncompressed_len / 64 + per_chunk_overhead * num_chunks + 512
}
}
pub struct ZstdHlifBackend {
inner: Mutex<HlifInner>,
chunk_size: usize,
}
impl std::fmt::Debug for ZstdHlifBackend {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ZstdHlifBackend")
.field("chunk_size", &self.chunk_size)
.finish()
}
}
impl ZstdHlifBackend {
pub fn new() -> Result<Self> {
Self::with_chunk_size(DEFAULT_HLIF_CHUNK_SIZE)
}
pub fn with_chunk_size(chunk_size: usize) -> Result<Self> {
if !cuda_available() {
return Err(Error::BackendUnavailable("no CUDA device available"));
}
let mut stream: cudaStream_t = null_mut();
check_cuda(
unsafe { cudaStreamCreate(&mut stream) },
"cudaStreamCreate(ZstdHlifBackend)",
)?;
let mut handle: *mut c_void = null_mut();
let rc = unsafe { ferro_nvcomp_hlif_create_zstd(chunk_size, stream, &mut handle) };
if rc != 0 {
unsafe { cudaStreamDestroy(stream) };
return Err(Error::Compress(format!(
"ferro_nvcomp_hlif_create_zstd failed: rc=0x{rc:x} msg={}",
pull_shim_error()
)));
}
Ok(Self {
inner: Mutex::new(HlifInner::new(handle, stream)),
chunk_size,
})
}
pub fn cuda_stream(&self) -> cudaStream_t {
self.inner
.lock()
.expect("ZstdHlifBackend inner poisoned")
.stream
}
}
impl Codec for ZstdHlifBackend {
fn algo(&self) -> Algo {
Algo::Zstd
}
fn compress(&self, input: &[u8], output: &mut Vec<u8>) -> Result<()> {
let mut inner = self.inner.lock().expect("ZstdHlifBackend inner poisoned");
compress_via_hlif(&mut inner, input, output)
}
fn decompress(&self, input: &[u8], output: &mut Vec<u8>) -> Result<()> {
let mut inner = self.inner.lock().expect("ZstdHlifBackend inner poisoned");
decompress_via_hlif(&mut inner, input, output)
}
fn max_compressed_len(&self, uncompressed_len: usize) -> usize {
let num_chunks = uncompressed_len.div_ceil(self.chunk_size).max(1);
let per_chunk_overhead = 64usize;
uncompressed_len + uncompressed_len / 200 + per_chunk_overhead * num_chunks + 512
}
}
fn compress_via_hlif(inner: &mut HlifInner, input: &[u8], output: &mut Vec<u8>) -> Result<()> {
if input.is_empty() {
output.push(0x00);
return Ok(());
}
let mut max_comp_bytes: usize = 0;
let rc = unsafe {
ferro_nvcomp_hlif_max_compressed_size(inner.handle, input.len(), &mut max_comp_bytes)
};
check_shim(rc, "max_compressed_size")?;
HlifInner::ensure_buf(&mut inner.d_in, &mut inner.d_in_cap, input.len())?;
HlifInner::ensure_buf(
&mut inner.d_out,
&mut inner.d_out_cap,
max_comp_bytes.max(1),
)?;
check_cuda(
unsafe {
cudaMemcpyAsync(
inner.d_in,
input.as_ptr() as *const c_void,
input.len(),
cudaMemcpyKind::cudaMemcpyHostToDevice,
inner.stream,
)
},
"cudaMemcpyAsync(hlif H2D)",
)?;
let mut comp_bytes: usize = 0;
let rc = unsafe {
ferro_nvcomp_hlif_compress(
inner.handle,
inner.d_in as *const u8,
input.len(),
inner.d_out as *mut u8,
&mut comp_bytes,
)
};
check_shim(rc, "hlif compress")?;
if comp_bytes > max_comp_bytes {
return Err(Error::Compress(format!(
"HLIF reported comp_bytes={comp_bytes} > pre-computed max={max_comp_bytes}"
)));
}
let start = output.len();
output.resize(start + comp_bytes, 0);
check_cuda(
unsafe {
cudaMemcpyAsync(
output[start..].as_mut_ptr() as *mut c_void,
inner.d_out as *const c_void,
comp_bytes,
cudaMemcpyKind::cudaMemcpyDeviceToHost,
inner.stream,
)
},
"cudaMemcpyAsync(hlif D2H)",
)?;
check_cuda(
unsafe { cudaStreamSynchronize(inner.stream) },
"cudaStreamSynchronize(hlif compress D2H)",
)?;
Ok(())
}
fn decompress_via_hlif(inner: &mut HlifInner, input: &[u8], output: &mut Vec<u8>) -> Result<()> {
if input.is_empty() {
return Err(Error::Decompress(
"HLIF decompress: empty input".to_string(),
));
}
if input.len() == 1 && input[0] == 0x00 {
return Ok(());
}
HlifInner::ensure_buf(&mut inner.d_out, &mut inner.d_out_cap, input.len())?;
check_cuda(
unsafe {
cudaMemcpyAsync(
inner.d_out,
input.as_ptr() as *const c_void,
input.len(),
cudaMemcpyKind::cudaMemcpyHostToDevice,
inner.stream,
)
},
"cudaMemcpyAsync(hlif compressed H2D)",
)?;
let mut decomp_bytes: usize = 0;
let rc = unsafe {
ferro_nvcomp_hlif_get_decompressed_output_size(
inner.handle,
inner.d_out as *const u8,
&mut decomp_bytes,
)
};
check_shim(rc, "hlif get_decompressed_output_size")?;
HlifInner::ensure_buf(
&mut inner.d_decomp,
&mut inner.d_decomp_cap,
decomp_bytes.max(1),
)?;
let mut decomp_actual: usize = 0;
let rc = unsafe {
ferro_nvcomp_hlif_decompress(
inner.handle,
inner.d_out as *const u8,
inner.d_decomp as *mut u8,
&mut decomp_actual,
)
};
check_shim(rc, "hlif decompress")?;
if decomp_actual != decomp_bytes {
return Err(Error::Decompress(format!(
"HLIF reported decomp_actual={decomp_actual} != header decomp_bytes={decomp_bytes}"
)));
}
let start = output.len();
output.resize(start + decomp_bytes, 0);
check_cuda(
unsafe {
cudaMemcpyAsync(
output[start..].as_mut_ptr() as *mut c_void,
inner.d_decomp as *const c_void,
decomp_bytes,
cudaMemcpyKind::cudaMemcpyDeviceToHost,
inner.stream,
)
},
"cudaMemcpyAsync(hlif decomp D2H)",
)?;
check_cuda(
unsafe { cudaStreamSynchronize(inner.stream) },
"cudaStreamSynchronize(hlif decompress D2H)",
)?;
Ok(())
}