1#[cfg(any(feature = "nvcomp-gpu", test))]
32pub use crate::{MAX_DECOMPRESSED_BYTES, validate_decompress_manifest};
33
34#[cfg(feature = "nvcomp-gpu")]
35mod imp {
36 use std::sync::Arc;
37
38 use crate::ferro_compress::{Algo, BitcompDataType, Codec as FerroCodec, NvcompCodec};
39 use bytes::Bytes;
40
41 use super::validate_decompress_manifest;
42 use crate::{ChunkManifest, Codec, CodecError, CodecKind, DECOMPRESS_BOOTSTRAP_CAPACITY};
43
44 pub struct NvcompZstdCodec {
46 inner: Arc<NvcompCodec>,
47 }
48
49 impl NvcompZstdCodec {
50 pub fn new() -> Result<Self, CodecError> {
51 let inner = NvcompCodec::new(Algo::Zstd)
52 .map_err(|e| CodecError::Backend(anyhow::anyhow!("nvcomp zstd init: {e}")))?;
53 Ok(Self {
54 inner: Arc::new(inner),
55 })
56 }
57 }
58
59 #[async_trait::async_trait]
60 impl Codec for NvcompZstdCodec {
61 fn kind(&self) -> CodecKind {
62 CodecKind::NvcompZstd
63 }
64
65 async fn compress(&self, input: Bytes) -> Result<(Bytes, ChunkManifest), CodecError> {
66 let original_size = input.len() as u64;
67 let original_crc = crc32c::crc32c(&input);
68 let codec = Arc::clone(&self.inner);
69 let compressed = tokio::task::spawn_blocking(move || -> Result<Vec<u8>, CodecError> {
70 let mut out = Vec::with_capacity(codec.max_compressed_len(input.len()));
71 codec.compress(input.as_ref(), &mut out).map_err(|e| {
72 CodecError::Backend(anyhow::anyhow!("nvcomp zstd compress: {e}"))
73 })?;
74 Ok(out)
75 })
76 .await??;
77 let manifest = ChunkManifest {
78 codec: CodecKind::NvcompZstd,
79 original_size,
80 compressed_size: compressed.len() as u64,
81 crc32c: original_crc,
82 };
83 Ok((Bytes::from(compressed), manifest))
84 }
85
86 async fn decompress(
87 &self,
88 input: Bytes,
89 manifest: &ChunkManifest,
90 ) -> Result<Bytes, CodecError> {
91 if manifest.codec != CodecKind::NvcompZstd {
92 return Err(CodecError::CodecMismatch {
93 expected: CodecKind::NvcompZstd,
94 got: manifest.codec,
95 });
96 }
97 let expected_crc = manifest.crc32c;
104 let expected_orig_size = validate_decompress_manifest(manifest, input.len())?;
105 let codec = Arc::clone(&self.inner);
106 let decompressed =
107 tokio::task::spawn_blocking(move || -> Result<Vec<u8>, CodecError> {
108 let mut out =
118 Vec::with_capacity(expected_orig_size.min(DECOMPRESS_BOOTSTRAP_CAPACITY));
119 codec.decompress(input.as_ref(), &mut out).map_err(|e| {
120 CodecError::Backend(anyhow::anyhow!("nvcomp zstd decompress: {e}"))
121 })?;
122 Ok(out)
123 })
124 .await??;
125 if decompressed.len() != expected_orig_size {
126 return Err(CodecError::SizeMismatch {
127 expected: manifest.original_size,
128 got: decompressed.len() as u64,
129 });
130 }
131 let actual_crc = crc32c::crc32c(&decompressed);
132 if actual_crc != expected_crc {
133 return Err(CodecError::CrcMismatch {
134 expected: expected_crc,
135 got: actual_crc,
136 });
137 }
138 Ok(Bytes::from(decompressed))
139 }
140 }
141
142 pub struct NvcompBitcompCodec {
144 inner: Arc<NvcompCodec>,
145 }
146
147 impl NvcompBitcompCodec {
148 pub fn new(data_type: BitcompDataType) -> Result<Self, CodecError> {
151 let inner = NvcompCodec::new(Algo::Bitcomp { data_type })
152 .map_err(|e| CodecError::Backend(anyhow::anyhow!("nvcomp bitcomp init: {e}")))?;
153 Ok(Self {
154 inner: Arc::new(inner),
155 })
156 }
157
158 pub fn default_general() -> Result<Self, CodecError> {
160 Self::new(BitcompDataType::Char)
161 }
162 }
163
164 #[async_trait::async_trait]
165 impl Codec for NvcompBitcompCodec {
166 fn kind(&self) -> CodecKind {
167 CodecKind::NvcompBitcomp
168 }
169
170 async fn compress(&self, input: Bytes) -> Result<(Bytes, ChunkManifest), CodecError> {
171 let original_size = input.len() as u64;
172 let original_crc = crc32c::crc32c(&input);
173 let codec = Arc::clone(&self.inner);
174 let compressed = tokio::task::spawn_blocking(move || -> Result<Vec<u8>, CodecError> {
175 let mut out = Vec::with_capacity(codec.max_compressed_len(input.len()));
176 codec.compress(input.as_ref(), &mut out).map_err(|e| {
177 CodecError::Backend(anyhow::anyhow!("nvcomp bitcomp compress: {e}"))
178 })?;
179 Ok(out)
180 })
181 .await??;
182 let manifest = ChunkManifest {
183 codec: CodecKind::NvcompBitcomp,
184 original_size,
185 compressed_size: compressed.len() as u64,
186 crc32c: original_crc,
187 };
188 Ok((Bytes::from(compressed), manifest))
189 }
190
191 async fn decompress(
192 &self,
193 input: Bytes,
194 manifest: &ChunkManifest,
195 ) -> Result<Bytes, CodecError> {
196 if manifest.codec != CodecKind::NvcompBitcomp {
197 return Err(CodecError::CodecMismatch {
198 expected: CodecKind::NvcompBitcomp,
199 got: manifest.codec,
200 });
201 }
202 let expected_crc = manifest.crc32c;
204 let expected_orig_size = validate_decompress_manifest(manifest, input.len())?;
205 let codec = Arc::clone(&self.inner);
206 let decompressed =
207 tokio::task::spawn_blocking(move || -> Result<Vec<u8>, CodecError> {
208 let mut out =
218 Vec::with_capacity(expected_orig_size.min(DECOMPRESS_BOOTSTRAP_CAPACITY));
219 codec.decompress(input.as_ref(), &mut out).map_err(|e| {
220 CodecError::Backend(anyhow::anyhow!("nvcomp bitcomp decompress: {e}"))
221 })?;
222 Ok(out)
223 })
224 .await??;
225 if decompressed.len() != expected_orig_size {
226 return Err(CodecError::SizeMismatch {
227 expected: manifest.original_size,
228 got: decompressed.len() as u64,
229 });
230 }
231 let actual_crc = crc32c::crc32c(&decompressed);
232 if actual_crc != expected_crc {
233 return Err(CodecError::CrcMismatch {
234 expected: expected_crc,
235 got: actual_crc,
236 });
237 }
238 Ok(Bytes::from(decompressed))
239 }
240 }
241
242 pub struct NvcompGDeflateCodec {
247 inner: Arc<NvcompCodec>,
248 }
249
250 impl NvcompGDeflateCodec {
251 pub fn new() -> Result<Self, CodecError> {
252 let inner = NvcompCodec::new(Algo::GDeflate)
253 .map_err(|e| CodecError::Backend(anyhow::anyhow!("nvcomp gdeflate init: {e}")))?;
254 Ok(Self {
255 inner: Arc::new(inner),
256 })
257 }
258 }
259
260 #[async_trait::async_trait]
261 impl Codec for NvcompGDeflateCodec {
262 fn kind(&self) -> CodecKind {
263 CodecKind::NvcompGDeflate
264 }
265
266 async fn compress(&self, input: Bytes) -> Result<(Bytes, ChunkManifest), CodecError> {
267 let original_size = input.len() as u64;
268 let original_crc = crc32c::crc32c(&input);
269 let codec = Arc::clone(&self.inner);
270 let compressed = tokio::task::spawn_blocking(move || -> Result<Vec<u8>, CodecError> {
271 let mut out = Vec::with_capacity(codec.max_compressed_len(input.len()));
272 codec.compress(input.as_ref(), &mut out).map_err(|e| {
273 CodecError::Backend(anyhow::anyhow!("nvcomp gdeflate compress: {e}"))
274 })?;
275 Ok(out)
276 })
277 .await??;
278 let manifest = ChunkManifest {
279 codec: CodecKind::NvcompGDeflate,
280 original_size,
281 compressed_size: compressed.len() as u64,
282 crc32c: original_crc,
283 };
284 Ok((Bytes::from(compressed), manifest))
285 }
286
287 async fn decompress(
288 &self,
289 input: Bytes,
290 manifest: &ChunkManifest,
291 ) -> Result<Bytes, CodecError> {
292 if manifest.codec != CodecKind::NvcompGDeflate {
293 return Err(CodecError::CodecMismatch {
294 expected: CodecKind::NvcompGDeflate,
295 got: manifest.codec,
296 });
297 }
298 let expected_crc = manifest.crc32c;
300 let expected_orig_size = validate_decompress_manifest(manifest, input.len())?;
301 let codec = Arc::clone(&self.inner);
302 let decompressed =
303 tokio::task::spawn_blocking(move || -> Result<Vec<u8>, CodecError> {
304 let mut out =
314 Vec::with_capacity(expected_orig_size.min(DECOMPRESS_BOOTSTRAP_CAPACITY));
315 codec.decompress(input.as_ref(), &mut out).map_err(|e| {
316 CodecError::Backend(anyhow::anyhow!("nvcomp gdeflate decompress: {e}"))
317 })?;
318 Ok(out)
319 })
320 .await??;
321 if decompressed.len() != expected_orig_size {
322 return Err(CodecError::SizeMismatch {
323 expected: manifest.original_size,
324 got: decompressed.len() as u64,
325 });
326 }
327 let actual_crc = crc32c::crc32c(&decompressed);
328 if actual_crc != expected_crc {
329 return Err(CodecError::CrcMismatch {
330 expected: expected_crc,
331 got: actual_crc,
332 });
333 }
334 Ok(Bytes::from(decompressed))
335 }
336 }
337
338 pub fn is_gpu_available() -> bool {
340 NvcompCodec::is_available()
341 }
342}
343
344#[cfg(feature = "nvcomp-gpu")]
345pub use imp::{NvcompBitcompCodec, NvcompGDeflateCodec, NvcompZstdCodec, is_gpu_available};
346
347#[cfg(feature = "nvcomp-gpu")]
353pub use crate::ferro_compress::BitcompDataType;
354
355#[cfg(not(feature = "nvcomp-gpu"))]
356pub fn is_gpu_available() -> bool {
357 false
358}
359
360#[cfg(all(test, feature = "nvcomp-gpu"))]
361mod tests {
362 use super::*;
363 use crate::Codec;
364 use bytes::Bytes;
365
366 #[tokio::test]
367 #[ignore = "requires CUDA-capable GPU + NVCOMP_HOME at build time"]
368 async fn nvcomp_zstd_roundtrip() {
369 if !is_gpu_available() {
370 eprintln!("skipping: no CUDA GPU detected at runtime");
371 return;
372 }
373 let codec = NvcompZstdCodec::new().expect("init");
374 let input = Bytes::from(vec![b'a'; 100_000]);
375 let (compressed, manifest) = codec.compress(input.clone()).await.expect("compress");
376 assert!(compressed.len() < input.len() / 10);
377 let decompressed = codec
378 .decompress(compressed, &manifest)
379 .await
380 .expect("decompress");
381 assert_eq!(decompressed, input);
382 }
383
384 #[tokio::test]
385 #[ignore = "requires CUDA-capable GPU + NVCOMP_HOME at build time"]
386 async fn nvcomp_bitcomp_roundtrip_on_integer_column() {
387 if !is_gpu_available() {
388 eprintln!("skipping: no CUDA GPU detected at runtime");
389 return;
390 }
391 let codec = NvcompBitcompCodec::default_general().expect("init");
392 let mut payload: Vec<u8> = Vec::with_capacity(8192);
394 for i in 0i64..1024 {
395 payload.extend_from_slice(&i.to_le_bytes());
396 }
397 let input = Bytes::from(payload);
398 let (compressed, manifest) = codec.compress(input.clone()).await.expect("compress");
399 assert!(
401 compressed.len() < input.len() / 2,
402 "bitcomp on monotone i64 should compress >2x, got {} -> {}",
403 input.len(),
404 compressed.len()
405 );
406 let decompressed = codec
407 .decompress(compressed, &manifest)
408 .await
409 .expect("decompress");
410 assert_eq!(decompressed, input);
411 }
412}
413
414#[cfg(test)]
420mod manifest_validate_tests {
421 use super::{MAX_DECOMPRESSED_BYTES, validate_decompress_manifest};
422 use crate::{ChunkManifest, CodecError, CodecKind};
423
424 fn manifest(original: u64, compressed: u64) -> ChunkManifest {
425 ChunkManifest {
426 codec: CodecKind::NvcompZstd,
427 original_size: original,
428 compressed_size: compressed,
429 crc32c: 0,
430 }
431 }
432
433 #[test]
434 fn decompress_rejects_manifest_original_size_over_limit() {
435 let m = manifest(MAX_DECOMPRESSED_BYTES + 1, 1024);
439 let err = validate_decompress_manifest(&m, 1024).unwrap_err();
440 match err {
441 CodecError::ManifestSizeExceedsLimit { requested, limit } => {
442 assert_eq!(requested, MAX_DECOMPRESSED_BYTES + 1);
443 assert_eq!(limit, MAX_DECOMPRESSED_BYTES);
444 }
445 other => panic!("expected ManifestSizeExceedsLimit, got {other:?}"),
446 }
447 }
448
449 #[test]
450 fn decompress_rejects_manifest_compressed_size_mismatch() {
451 let m = manifest(1024, 2048);
455 let err = validate_decompress_manifest(&m, 1024).unwrap_err();
456 match err {
457 CodecError::ManifestSizeMismatch {
458 manifest: m_size,
459 actual,
460 } => {
461 assert_eq!(m_size, 2048);
462 assert_eq!(actual, 1024);
463 }
464 other => panic!("expected ManifestSizeMismatch, got {other:?}"),
465 }
466 }
467
468 #[test]
469 fn decompress_validates_well_formed_manifest() {
470 let m = manifest(MAX_DECOMPRESSED_BYTES, 1024);
473 let n = validate_decompress_manifest(&m, 1024)
474 .expect("well-formed manifest at the ceiling must pass");
475 assert_eq!(n as u64, MAX_DECOMPRESSED_BYTES);
476 }
477
478 #[cfg(target_pointer_width = "32")]
491 #[test]
492 fn decompress_rejects_u64_to_usize_overflow_on_32bit_targets() {
493 let m = manifest(MAX_DECOMPRESSED_BYTES, 1024);
495 let err = validate_decompress_manifest(&m, 1024).unwrap_err();
496 assert!(matches!(err, CodecError::ManifestSizeExceedsLimit { .. }));
497 }
498
499 #[cfg(target_pointer_width = "64")]
505 #[test]
506 fn decompress_rejects_u64_to_usize_overflow_on_32bit_targets() {
507 let m = manifest(MAX_DECOMPRESSED_BYTES, 16);
508 let n = validate_decompress_manifest(&m, 16).expect("limit value narrows on 64-bit");
509 assert_eq!(n as u64, MAX_DECOMPRESSED_BYTES);
510 }
511}