#![cfg(feature = "nvcomp-gpu")]
use std::ffi::c_void;
use std::ptr::null_mut;
use super::algo::BitcompDataType;
use super::error::{Error, Result};
use super::nvcomp_sys::cuda::{
CUDA_SUCCESS, cudaError_t, cudaFree, cudaGetErrorString, cudaMalloc, cudaMemcpy,
cudaMemcpyKind, cudaStream_t, cudaStreamSynchronize,
};
use super::nvcomp_sys::nvcomp::{
NVCOMP_BITCOMP_FORMAT_DEFAULT, NVCOMP_TYPE_BFLOAT16, NVCOMP_TYPE_CHAR, NVCOMP_TYPE_DOUBLE,
NVCOMP_TYPE_FLOAT, NVCOMP_TYPE_INT, NVCOMP_TYPE_LONGLONG, NVCOMP_TYPE_SHORT, NVCOMP_TYPE_UCHAR,
NVCOMP_TYPE_UINT, NVCOMP_TYPE_ULONGLONG, NVCOMP_TYPE_USHORT, nvcompBatchedBitcompCompressAsync,
nvcompBatchedBitcompCompressGetMaxOutputChunkSize, nvcompBatchedBitcompCompressGetTempSizeSync,
nvcompBatchedBitcompDecompressAsync, nvcompBatchedBitcompDecompressGetTempSizeAsync,
nvcompBatchedBitcompDecompressOpts_t, nvcompBatchedBitcompFormatOpts, nvcompStatus_t,
nvcompSuccess, nvcompType_t,
};
use super::slab_alloc::SlabAllocator;
pub struct BitcompDeviceCodec {
stream: cudaStream_t,
owns_stream: bool,
format_opts: nvcompBatchedBitcompFormatOpts,
decompress_opts: nvcompBatchedBitcompDecompressOpts_t,
d_uncomp_ptrs: *mut c_void,
d_uncomp_sizes: *mut c_void,
d_comp_ptrs: *mut c_void,
d_comp_sizes: *mut c_void,
d_uncomp_buffer_sizes: *mut c_void,
d_statuses: *mut c_void,
d_temp: *mut c_void,
d_temp_cap: usize,
d_batch_comp_ptrs: *mut c_void,
d_batch_comp_sizes: *mut c_void,
d_batch_uncomp_buffer_sizes: *mut c_void,
d_batch_uncomp_sizes: *mut c_void,
d_batch_uncomp_ptrs: *mut c_void,
d_batch_statuses: *mut c_void,
d_batch_cap: usize,
slab: Option<SlabAllocator>,
}
unsafe impl Send for BitcompDeviceCodec {}
unsafe impl Sync for BitcompDeviceCodec {}
impl std::fmt::Debug for BitcompDeviceCodec {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("BitcompDeviceCodec")
.field("stream_is_null", &self.stream.is_null())
.field("owns_stream", &self.owns_stream)
.field("data_type", &self.format_opts.data_type)
.field("d_temp_cap", &self.d_temp_cap)
.finish()
}
}
impl BitcompDeviceCodec {
pub fn new(data_type: BitcompDataType) -> Result<Self> {
Self::new_inner(data_type, false)
}
pub fn with_stream(data_type: BitcompDataType, stream: cudaStream_t) -> Result<Self> {
Self::with_stream_internal(
data_type, stream, false, false,
)
}
pub fn new_with_slab(data_type: BitcompDataType) -> Result<Self> {
Self::new_inner(data_type, true)
}
pub fn with_stream_and_slab(data_type: BitcompDataType, stream: cudaStream_t) -> Result<Self> {
Self::with_stream_internal(
data_type, stream, false, true,
)
}
fn new_inner(data_type: BitcompDataType, with_slab: bool) -> Result<Self> {
let mut stream: cudaStream_t = null_mut();
let rc = unsafe { super::nvcomp_sys::cuda::cudaStreamCreate(&mut stream) };
check_cuda(rc, "cudaStreamCreate(BitcompDeviceCodec)")?;
match Self::with_stream_internal(data_type, stream, true, with_slab) {
Ok(c) => Ok(c),
Err(e) => {
unsafe {
let _ = super::nvcomp_sys::cuda::cudaStreamDestroy(stream);
}
Err(e)
}
}
}
fn with_stream_internal(
data_type: BitcompDataType,
stream: cudaStream_t,
owns_stream: bool,
with_slab: bool,
) -> Result<Self> {
let format_opts = bitcomp_format_opts(data_type);
let decompress_opts = nvcompBatchedBitcompDecompressOpts_t::default();
let mut codec = Self {
stream,
owns_stream,
format_opts,
decompress_opts,
d_uncomp_ptrs: null_mut(),
d_uncomp_sizes: null_mut(),
d_comp_ptrs: null_mut(),
d_comp_sizes: null_mut(),
d_uncomp_buffer_sizes: null_mut(),
d_statuses: null_mut(),
d_temp: null_mut(),
d_temp_cap: 0,
d_batch_comp_ptrs: null_mut(),
d_batch_comp_sizes: null_mut(),
d_batch_uncomp_buffer_sizes: null_mut(),
d_batch_uncomp_sizes: null_mut(),
d_batch_uncomp_ptrs: null_mut(),
d_batch_statuses: null_mut(),
d_batch_cap: 0,
slab: if with_slab {
Some(SlabAllocator::new())
} else {
None
},
};
codec.alloc_metadata_singletons()?;
Ok(codec)
}
pub fn max_compressed_size(
uncompressed_size: usize,
data_type: BitcompDataType,
) -> Result<usize> {
let format_opts = bitcomp_format_opts(data_type);
let mut max_out: usize = 0;
let status = unsafe {
nvcompBatchedBitcompCompressGetMaxOutputChunkSize(
uncompressed_size,
format_opts,
&mut max_out,
)
};
check_nvcomp(status, "nvcompBatchedBitcompCompressGetMaxOutputChunkSize")?;
Ok(max_out.div_ceil(256) * 256)
}
pub unsafe fn compress_one(
&mut self,
d_uncompressed: *const c_void,
uncomp_size: usize,
d_compressed: *mut c_void,
max_comp_size: usize,
) -> Result<usize> {
if uncomp_size == 0 {
return Err(Error::Compress(
"BitcompDeviceCodec::compress_one: uncomp_size must be > 0".into(),
));
}
if uncomp_size > (1 << 24) {
return Err(Error::Compress(format!(
"BitcompDeviceCodec::compress_one: uncomp_size {uncomp_size} exceeds 16 MiB \
single-chunk limit; split across multiple calls"
)));
}
let mut temp_bytes: usize = 0;
let h_uncomp_ptr = d_uncompressed;
let h_uncomp_size = uncomp_size;
let status = unsafe {
nvcompBatchedBitcompCompressGetTempSizeSync(
&h_uncomp_ptr as *const *const c_void,
&h_uncomp_size as *const usize,
1,
uncomp_size,
self.format_opts,
&mut temp_bytes,
uncomp_size,
self.stream,
)
};
check_nvcomp(status, "nvcompBatchedBitcompCompressGetTempSizeSync")?;
self.ensure_d_temp(temp_bytes)?;
let h_comp_ptr = d_compressed;
unsafe {
self.h2d_singleton_ptr(self.d_uncomp_ptrs, h_uncomp_ptr)?;
self.h2d_singleton_size(self.d_uncomp_sizes, h_uncomp_size)?;
self.h2d_singleton_ptr(self.d_comp_ptrs, h_comp_ptr)?;
}
let status = unsafe {
nvcompBatchedBitcompCompressAsync(
self.d_uncomp_ptrs as *const *const c_void,
self.d_uncomp_sizes as *const usize,
uncomp_size,
1,
self.d_temp,
self.d_temp_cap,
self.d_comp_ptrs as *const *mut c_void,
self.d_comp_sizes as *mut usize,
self.format_opts,
self.d_statuses as *mut nvcompStatus_t,
self.stream,
)
};
check_nvcomp(status, "nvcompBatchedBitcompCompressAsync")?;
let rc = unsafe { cudaStreamSynchronize(self.stream) };
check_cuda(rc, "cudaStreamSynchronize(compress)")?;
let mut actual_comp_size: usize = 0;
let rc = unsafe {
cudaMemcpy(
&mut actual_comp_size as *mut usize as *mut c_void,
self.d_comp_sizes,
std::mem::size_of::<usize>(),
cudaMemcpyKind::cudaMemcpyDeviceToHost,
)
};
check_cuda(rc, "cudaMemcpy(d_comp_sizes D2H)")?;
let mut status_h: nvcompStatus_t = 0;
let rc = unsafe {
cudaMemcpy(
&mut status_h as *mut nvcompStatus_t as *mut c_void,
self.d_statuses,
std::mem::size_of::<nvcompStatus_t>(),
cudaMemcpyKind::cudaMemcpyDeviceToHost,
)
};
check_cuda(rc, "cudaMemcpy(d_statuses D2H)")?;
check_nvcomp(status_h, "Bitcomp compress per-chunk status")?;
if actual_comp_size > max_comp_size {
return Err(Error::Compress(format!(
"Bitcomp compress wrote {actual_comp_size} bytes but caller bounded at \
{max_comp_size}; max_compressed_size() must be honoured"
)));
}
Ok(actual_comp_size)
}
pub unsafe fn decompress_one(
&mut self,
d_compressed: *const c_void,
comp_size: usize,
d_uncompressed: *mut c_void,
expected_uncomp_size: usize,
) -> Result<()> {
if comp_size == 0 {
return Err(Error::Decompress(
"BitcompDeviceCodec::decompress_one: comp_size must be > 0".into(),
));
}
if expected_uncomp_size == 0 {
return Err(Error::Decompress(
"BitcompDeviceCodec::decompress_one: expected_uncomp_size must be > 0".into(),
));
}
let mut temp_bytes: usize = 0;
let status = unsafe {
nvcompBatchedBitcompDecompressGetTempSizeAsync(
1,
expected_uncomp_size,
self.decompress_opts,
&mut temp_bytes,
expected_uncomp_size,
)
};
check_nvcomp(status, "nvcompBatchedBitcompDecompressGetTempSizeAsync")?;
self.ensure_d_temp(temp_bytes)?;
unsafe {
self.h2d_singleton_ptr(self.d_comp_ptrs, d_compressed)?;
self.h2d_singleton_size(self.d_comp_sizes, comp_size)?;
self.h2d_singleton_size(self.d_uncomp_buffer_sizes, expected_uncomp_size)?;
self.h2d_singleton_ptr(self.d_uncomp_ptrs, d_uncompressed)?;
}
let status = unsafe {
nvcompBatchedBitcompDecompressAsync(
self.d_comp_ptrs as *const *const c_void,
self.d_comp_sizes as *const usize,
self.d_uncomp_buffer_sizes as *const usize,
self.d_uncomp_sizes as *mut usize,
1,
self.d_temp,
self.d_temp_cap,
self.d_uncomp_ptrs as *const *mut c_void,
self.decompress_opts,
self.d_statuses as *mut nvcompStatus_t,
self.stream,
)
};
check_nvcomp(status, "nvcompBatchedBitcompDecompressAsync")?;
let rc = unsafe { cudaStreamSynchronize(self.stream) };
check_cuda(rc, "cudaStreamSynchronize(decompress)")?;
let mut status_h: nvcompStatus_t = 0;
let rc = unsafe {
cudaMemcpy(
&mut status_h as *mut nvcompStatus_t as *mut c_void,
self.d_statuses,
std::mem::size_of::<nvcompStatus_t>(),
cudaMemcpyKind::cudaMemcpyDeviceToHost,
)
};
check_cuda(rc, "cudaMemcpy(d_statuses D2H)")?;
check_nvcomp(status_h, "Bitcomp decompress per-chunk status")?;
let mut actual_uncomp_size: usize = 0;
let rc = unsafe {
cudaMemcpy(
&mut actual_uncomp_size as *mut usize as *mut c_void,
self.d_uncomp_sizes,
std::mem::size_of::<usize>(),
cudaMemcpyKind::cudaMemcpyDeviceToHost,
)
};
check_cuda(rc, "cudaMemcpy(d_uncomp_sizes D2H)")?;
if actual_uncomp_size != expected_uncomp_size {
return Err(Error::Decompress(format!(
"Bitcomp decompress produced {actual_uncomp_size} bytes but caller expected \
{expected_uncomp_size}; cache entry corruption?"
)));
}
Ok(())
}
pub unsafe fn decompress_batch(
&mut self,
entries: &[(*const c_void, usize, usize, *mut c_void)],
) -> Result<()> {
if entries.is_empty() {
return Err(Error::Decompress(
"BitcompDeviceCodec::decompress_batch: entries must be non-empty".into(),
));
}
for (i, (_, comp_size, expected_uncomp_size, _)) in entries.iter().enumerate() {
if *comp_size == 0 {
return Err(Error::Decompress(format!(
"BitcompDeviceCodec::decompress_batch: chunk {i} comp_size == 0"
)));
}
if *expected_uncomp_size == 0 {
return Err(Error::Decompress(format!(
"BitcompDeviceCodec::decompress_batch: chunk {i} expected_uncomp_size == 0"
)));
}
}
let n = entries.len();
let max_uncomp = entries.iter().map(|(_, _, eu, _)| *eu).max().unwrap_or(0);
let total_uncomp = entries.iter().map(|(_, _, eu, _)| *eu).sum::<usize>();
self.ensure_batch_capacity(n)?;
let mut temp_bytes: usize = 0;
let status = unsafe {
nvcompBatchedBitcompDecompressGetTempSizeAsync(
n,
max_uncomp,
self.decompress_opts,
&mut temp_bytes,
total_uncomp,
)
};
check_nvcomp(
status,
"nvcompBatchedBitcompDecompressGetTempSizeAsync(batch)",
)?;
self.ensure_d_temp(temp_bytes)?;
let mut h_comp_ptrs: Vec<*const c_void> = Vec::with_capacity(n);
let mut h_comp_sizes: Vec<usize> = Vec::with_capacity(n);
let mut h_uncomp_buffer_sizes: Vec<usize> = Vec::with_capacity(n);
let mut h_uncomp_ptrs: Vec<*mut c_void> = Vec::with_capacity(n);
for (d_comp, comp_size, expected_uncomp, d_uncomp) in entries {
h_comp_ptrs.push(*d_comp);
h_comp_sizes.push(*comp_size);
h_uncomp_buffer_sizes.push(*expected_uncomp);
h_uncomp_ptrs.push(*d_uncomp);
}
unsafe {
self.h2d_array(
self.d_batch_comp_ptrs,
h_comp_ptrs.as_ptr() as *const c_void,
n * std::mem::size_of::<*const c_void>(),
)?;
self.h2d_array(
self.d_batch_comp_sizes,
h_comp_sizes.as_ptr() as *const c_void,
n * std::mem::size_of::<usize>(),
)?;
self.h2d_array(
self.d_batch_uncomp_buffer_sizes,
h_uncomp_buffer_sizes.as_ptr() as *const c_void,
n * std::mem::size_of::<usize>(),
)?;
self.h2d_array(
self.d_batch_uncomp_ptrs,
h_uncomp_ptrs.as_ptr() as *const c_void,
n * std::mem::size_of::<*mut c_void>(),
)?;
}
let status = unsafe {
nvcompBatchedBitcompDecompressAsync(
self.d_batch_comp_ptrs as *const *const c_void,
self.d_batch_comp_sizes as *const usize,
self.d_batch_uncomp_buffer_sizes as *const usize,
self.d_batch_uncomp_sizes as *mut usize,
n,
self.d_temp,
self.d_temp_cap,
self.d_batch_uncomp_ptrs as *const *mut c_void,
self.decompress_opts,
self.d_batch_statuses as *mut nvcompStatus_t,
self.stream,
)
};
check_nvcomp(status, "nvcompBatchedBitcompDecompressAsync(batch)")?;
let rc = unsafe { cudaStreamSynchronize(self.stream) };
check_cuda(rc, "cudaStreamSynchronize(decompress_batch)")?;
let mut statuses_h: Vec<nvcompStatus_t> = vec![0; n];
let rc = unsafe {
cudaMemcpy(
statuses_h.as_mut_ptr() as *mut c_void,
self.d_batch_statuses,
n * std::mem::size_of::<nvcompStatus_t>(),
cudaMemcpyKind::cudaMemcpyDeviceToHost,
)
};
check_cuda(rc, "cudaMemcpy(d_batch_statuses D2H)")?;
let mut actual_sizes_h: Vec<usize> = vec![0; n];
let rc = unsafe {
cudaMemcpy(
actual_sizes_h.as_mut_ptr() as *mut c_void,
self.d_batch_uncomp_sizes,
n * std::mem::size_of::<usize>(),
cudaMemcpyKind::cudaMemcpyDeviceToHost,
)
};
check_cuda(rc, "cudaMemcpy(d_batch_uncomp_sizes D2H)")?;
for (i, status_h) in statuses_h.iter().enumerate() {
if *status_h != nvcompSuccess {
return Err(Error::Decompress(format!(
"Bitcomp decompress_batch: chunk {i} status={} ({})",
*status_h,
super::nvcomp_sys::nvcomp::status_str(*status_h),
)));
}
let expected = entries[i].2;
if actual_sizes_h[i] != expected {
return Err(Error::Decompress(format!(
"Bitcomp decompress_batch: chunk {i} produced {} bytes but caller expected {}; \
cache entry corruption?",
actual_sizes_h[i], expected,
)));
}
}
Ok(())
}
pub unsafe fn decompress_batch_into_slab(
&mut self,
entries: &[(*const c_void, usize, usize)],
) -> Result<Vec<(*mut c_void, usize)>> {
if self.slab.is_none() {
return Err(Error::Decompress(
"BitcompDeviceCodec::decompress_batch_into_slab: codec has no slab; \
use new_with_slab() or with_stream_and_slab()"
.into(),
));
}
if entries.is_empty() {
return Err(Error::Decompress(
"BitcompDeviceCodec::decompress_batch_into_slab: entries must be non-empty".into(),
));
}
for (i, (_, comp_size, expected_uncomp_size)) in entries.iter().enumerate() {
if *comp_size == 0 {
return Err(Error::Decompress(format!(
"BitcompDeviceCodec::decompress_batch_into_slab: chunk {i} comp_size == 0"
)));
}
if *expected_uncomp_size == 0 {
return Err(Error::Decompress(format!(
"BitcompDeviceCodec::decompress_batch_into_slab: \
chunk {i} expected_uncomp_size == 0"
)));
}
}
let mut outputs: Vec<(*mut c_void, usize)> = Vec::with_capacity(entries.len());
for (i, (_, _, expected_uncomp)) in entries.iter().enumerate() {
let slab = self.slab.as_mut().expect("checked above");
match slab.alloc(*expected_uncomp) {
Ok(p) => outputs.push((p, *expected_uncomp)),
Err(e) => {
for (p, sz) in outputs.drain(..) {
unsafe {
self.slab.as_mut().expect("checked").release(p, sz);
}
}
return Err(Error::Decompress(format!(
"decompress_batch_into_slab: slab alloc for chunk {i} failed: {e}"
)));
}
}
}
let dispatch_entries: Vec<(*const c_void, usize, usize, *mut c_void)> = entries
.iter()
.zip(outputs.iter())
.map(|((d_comp, cs, eu), (out_ptr, _))| (*d_comp, *cs, *eu, *out_ptr))
.collect();
let rc = unsafe { self.decompress_batch(&dispatch_entries) };
if let Err(e) = rc {
for (p, sz) in outputs.drain(..) {
unsafe {
self.slab.as_mut().expect("checked").release(p, sz);
}
}
return Err(e);
}
Ok(outputs)
}
pub unsafe fn release_slab_outputs(&mut self, outputs: &[(*mut c_void, usize)]) {
if let Some(slab) = self.slab.as_mut() {
for (p, sz) in outputs {
unsafe {
slab.release(*p, *sz);
}
}
}
}
pub fn slab(&self) -> Option<&SlabAllocator> {
self.slab.as_ref()
}
pub fn slab_mut(&mut self) -> Option<&mut SlabAllocator> {
self.slab.as_mut()
}
fn ensure_batch_capacity(&mut self, n: usize) -> Result<()> {
if self.d_batch_cap >= n {
return Ok(());
}
let new_cap = n.max(self.d_batch_cap * 2).max(16);
unsafe {
let slots: [&mut *mut c_void; 6] = [
&mut self.d_batch_comp_ptrs,
&mut self.d_batch_comp_sizes,
&mut self.d_batch_uncomp_buffer_sizes,
&mut self.d_batch_uncomp_sizes,
&mut self.d_batch_uncomp_ptrs,
&mut self.d_batch_statuses,
];
for slot in slots {
let p = std::mem::replace(slot, null_mut());
if !p.is_null() {
let _ = cudaFree(p);
}
}
self.d_batch_cap = 0;
}
let ptr_bytes = new_cap * std::mem::size_of::<*const c_void>();
let size_bytes = new_cap * std::mem::size_of::<usize>();
let status_bytes = new_cap * std::mem::size_of::<nvcompStatus_t>();
macro_rules! alloc_dev {
($field:ident, $bytes:expr, $name:literal) => {{
let mut p: *mut c_void = null_mut();
let rc = unsafe { cudaMalloc(&mut p, $bytes) };
if rc != CUDA_SUCCESS {
return Err(Error::Compress(format!(
concat!("cudaMalloc(", $name, ", {} bytes) failed: code={}"),
$bytes, rc
)));
}
self.$field = p;
}};
}
alloc_dev!(d_batch_comp_ptrs, ptr_bytes, "d_batch_comp_ptrs");
alloc_dev!(d_batch_comp_sizes, size_bytes, "d_batch_comp_sizes");
alloc_dev!(
d_batch_uncomp_buffer_sizes,
size_bytes,
"d_batch_uncomp_buffer_sizes"
);
alloc_dev!(d_batch_uncomp_sizes, size_bytes, "d_batch_uncomp_sizes");
alloc_dev!(d_batch_uncomp_ptrs, ptr_bytes, "d_batch_uncomp_ptrs");
alloc_dev!(d_batch_statuses, status_bytes, "d_batch_statuses");
self.d_batch_cap = new_cap;
Ok(())
}
unsafe fn h2d_array(
&self,
slot: *mut c_void,
host_src: *const c_void,
bytes: usize,
) -> Result<()> {
let rc = unsafe {
cudaMemcpy(
slot,
host_src,
bytes,
cudaMemcpyKind::cudaMemcpyHostToDevice,
)
};
check_cuda(rc, "cudaMemcpy(h2d_array)")
}
fn alloc_metadata_singletons(&mut self) -> Result<()> {
macro_rules! alloc_dev {
($field:ident, $bytes:expr, $name:literal) => {{
let mut p: *mut c_void = null_mut();
let rc = unsafe { cudaMalloc(&mut p, $bytes) };
if rc != CUDA_SUCCESS {
self.free_metadata_singletons();
return Err(Error::Compress(format!(
concat!("cudaMalloc(", $name, ") failed: code={}"),
rc
)));
}
self.$field = p;
}};
}
alloc_dev!(
d_uncomp_ptrs,
std::mem::size_of::<*const c_void>(),
"d_uncomp_ptrs"
);
alloc_dev!(
d_uncomp_sizes,
std::mem::size_of::<usize>(),
"d_uncomp_sizes"
);
alloc_dev!(
d_comp_ptrs,
std::mem::size_of::<*const c_void>(),
"d_comp_ptrs"
);
alloc_dev!(d_comp_sizes, std::mem::size_of::<usize>(), "d_comp_sizes");
alloc_dev!(
d_uncomp_buffer_sizes,
std::mem::size_of::<usize>(),
"d_uncomp_buffer_sizes"
);
alloc_dev!(
d_statuses,
std::mem::size_of::<nvcompStatus_t>(),
"d_statuses"
);
Ok(())
}
fn free_metadata_singletons(&mut self) {
unsafe {
let slots: [&mut *mut c_void; 6] = [
&mut self.d_uncomp_ptrs,
&mut self.d_uncomp_sizes,
&mut self.d_comp_ptrs,
&mut self.d_comp_sizes,
&mut self.d_uncomp_buffer_sizes,
&mut self.d_statuses,
];
for slot in slots {
let p = std::mem::replace(slot, null_mut());
if !p.is_null() {
let _ = cudaFree(p);
}
}
}
}
fn ensure_d_temp(&mut self, needed: usize) -> Result<()> {
if self.d_temp_cap >= needed {
return Ok(());
}
let new_cap = needed.max(self.d_temp_cap * 2).max(64 * 1024);
unsafe {
if !self.d_temp.is_null() {
let _ = cudaFree(self.d_temp);
self.d_temp = null_mut();
self.d_temp_cap = 0;
}
let mut p: *mut c_void = null_mut();
let rc = cudaMalloc(&mut p, new_cap);
if rc != CUDA_SUCCESS {
return Err(Error::Compress(format!(
"cudaMalloc(d_temp, {new_cap} bytes) failed: code={rc}"
)));
}
self.d_temp = p;
self.d_temp_cap = new_cap;
}
Ok(())
}
unsafe fn h2d_singleton_ptr(&self, slot: *mut c_void, value: *const c_void) -> Result<()> {
let host_value = value;
let rc = unsafe {
cudaMemcpy(
slot,
&host_value as *const *const c_void as *const c_void,
std::mem::size_of::<*const c_void>(),
cudaMemcpyKind::cudaMemcpyHostToDevice,
)
};
check_cuda(rc, "cudaMemcpy(h2d_singleton_ptr)")
}
unsafe fn h2d_singleton_size(&self, slot: *mut c_void, value: usize) -> Result<()> {
let host_value = value;
let rc = unsafe {
cudaMemcpy(
slot,
&host_value as *const usize as *const c_void,
std::mem::size_of::<usize>(),
cudaMemcpyKind::cudaMemcpyHostToDevice,
)
};
check_cuda(rc, "cudaMemcpy(h2d_singleton_size)")
}
pub fn stream(&self) -> cudaStream_t {
self.stream
}
}
impl Drop for BitcompDeviceCodec {
fn drop(&mut self) {
self.free_metadata_singletons();
unsafe {
let slots: [&mut *mut c_void; 6] = [
&mut self.d_batch_comp_ptrs,
&mut self.d_batch_comp_sizes,
&mut self.d_batch_uncomp_buffer_sizes,
&mut self.d_batch_uncomp_sizes,
&mut self.d_batch_uncomp_ptrs,
&mut self.d_batch_statuses,
];
for slot in slots {
let p = std::mem::replace(slot, null_mut());
if !p.is_null() {
let _ = cudaFree(p);
}
}
self.d_batch_cap = 0;
}
unsafe {
if !self.d_temp.is_null() {
let _ = cudaFree(self.d_temp);
self.d_temp = null_mut();
}
}
if self.owns_stream && !self.stream.is_null() {
unsafe {
let _ = super::nvcomp_sys::cuda::cudaStreamDestroy(self.stream);
}
self.stream = null_mut();
}
}
}
fn check_cuda(rc: cudaError_t, what: &'static str) -> Result<()> {
if rc == CUDA_SUCCESS {
return Ok(());
}
let msg = unsafe {
let s = cudaGetErrorString(rc);
if s.is_null() {
"<null>".to_string()
} else {
std::ffi::CStr::from_ptr(s).to_string_lossy().into_owned()
}
};
Err(Error::Compress(format!(
"CUDA error in {what}: {msg} (code={rc})"
)))
}
fn check_nvcomp(status: nvcompStatus_t, what: &'static str) -> Result<()> {
if status == nvcompSuccess {
Ok(())
} else {
Err(Error::Compress(format!(
"nvCOMP error in {what}: code={status}"
)))
}
}
fn bitcomp_to_nvcomp_type(dt: BitcompDataType) -> nvcompType_t {
match dt {
BitcompDataType::Char => NVCOMP_TYPE_CHAR,
BitcompDataType::Uint8 => NVCOMP_TYPE_UCHAR,
BitcompDataType::Uint16 => NVCOMP_TYPE_USHORT,
BitcompDataType::Uint32 => NVCOMP_TYPE_UINT,
BitcompDataType::Uint64 => NVCOMP_TYPE_ULONGLONG,
BitcompDataType::Int8 => NVCOMP_TYPE_CHAR,
BitcompDataType::Int16 => NVCOMP_TYPE_SHORT,
BitcompDataType::Int32 => NVCOMP_TYPE_INT,
BitcompDataType::Int64 => NVCOMP_TYPE_LONGLONG,
BitcompDataType::Float32 => NVCOMP_TYPE_FLOAT,
BitcompDataType::Float64 => NVCOMP_TYPE_DOUBLE,
BitcompDataType::BFloat16 => NVCOMP_TYPE_BFLOAT16,
}
}
fn bitcomp_format_opts(dt: BitcompDataType) -> nvcompBatchedBitcompFormatOpts {
nvcompBatchedBitcompFormatOpts {
algorithm_type: NVCOMP_BITCOMP_FORMAT_DEFAULT as std::ffi::c_int,
data_type: bitcomp_to_nvcomp_type(dt),
reserved: [0; 56],
}
}
#[cfg(test)]
mod tests {
use crate::ferro_compress::nvcomp_sys::cuda::cudaMemcpy;
use super::*;
fn try_codec() -> Option<BitcompDeviceCodec> {
BitcompDeviceCodec::new(BitcompDataType::Uint32).ok()
}
unsafe fn upload_to_device(host: &[u8]) -> *mut c_void {
let mut p: *mut c_void = null_mut();
let rc = unsafe { cudaMalloc(&mut p, host.len()) };
assert_eq!(rc, CUDA_SUCCESS);
let rc = unsafe {
cudaMemcpy(
p,
host.as_ptr() as *const c_void,
host.len(),
cudaMemcpyKind::cudaMemcpyHostToDevice,
)
};
assert_eq!(rc, CUDA_SUCCESS);
p
}
unsafe fn alloc_device(bytes: usize) -> *mut c_void {
let mut p: *mut c_void = null_mut();
let rc = unsafe { cudaMalloc(&mut p, bytes) };
assert_eq!(rc, CUDA_SUCCESS);
p
}
unsafe fn download_from_device(d_src: *const c_void, host: &mut [u8]) {
let rc = unsafe {
cudaMemcpy(
host.as_mut_ptr() as *mut c_void,
d_src,
host.len(),
cudaMemcpyKind::cudaMemcpyDeviceToHost,
)
};
assert_eq!(rc, CUDA_SUCCESS);
}
#[test]
fn roundtrip_small_uint32() {
let Some(mut codec) = try_codec() else { return };
let words: Vec<u32> = (0..2048u32).map(|i| (i * 31) & 0xff_ff).collect();
let input_bytes: &[u8] =
unsafe { std::slice::from_raw_parts(words.as_ptr() as *const u8, words.len() * 4) };
let max_comp =
BitcompDeviceCodec::max_compressed_size(input_bytes.len(), BitcompDataType::Uint32)
.unwrap();
unsafe {
let d_uncomp = upload_to_device(input_bytes);
let d_comp = alloc_device(max_comp);
let actual_comp = codec
.compress_one(d_uncomp, input_bytes.len(), d_comp, max_comp)
.expect("compress");
assert!(actual_comp > 0);
assert!(actual_comp <= max_comp);
let d_decomp = alloc_device(input_bytes.len());
codec
.decompress_one(d_comp, actual_comp, d_decomp, input_bytes.len())
.expect("decompress");
let mut got = vec![0u8; input_bytes.len()];
download_from_device(d_decomp, &mut got);
assert_eq!(got, input_bytes);
let _ = cudaFree(d_uncomp);
let _ = cudaFree(d_comp);
let _ = cudaFree(d_decomp);
}
}
#[test]
fn compress_zero_size_is_error() {
let Some(mut codec) = try_codec() else { return };
let res = unsafe { codec.compress_one(null_mut(), 0, null_mut(), 0) };
assert!(res.is_err());
}
#[test]
fn decompress_zero_size_is_error() {
let Some(mut codec) = try_codec() else { return };
let res = unsafe { codec.decompress_one(null_mut(), 0, null_mut(), 1) };
assert!(res.is_err());
let res = unsafe { codec.decompress_one(null_mut(), 1, null_mut(), 0) };
assert!(res.is_err());
}
#[test]
fn compress_oversize_chunk_is_error() {
let Some(mut codec) = try_codec() else { return };
let res = unsafe { codec.compress_one(null_mut(), (1 << 24) + 1, null_mut(), 1 << 25) };
assert!(res.is_err());
}
#[test]
fn max_compressed_size_round_to_256() {
let n = BitcompDeviceCodec::max_compressed_size(8192, BitcompDataType::Uint32)
.expect("max_compressed_size");
assert!(n >= 8192);
assert_eq!(n % 256, 0);
}
}