1#[cfg(any(feature = "nvcomp-gpu", test))]
27use crate::{ChunkManifest, CodecError};
28
29#[cfg(any(feature = "nvcomp-gpu", test))]
42pub const MAX_DECOMPRESSED_BYTES: u64 = 5 * 1024 * 1024 * 1024;
43
44#[cfg(any(feature = "nvcomp-gpu", test))]
53pub(crate) fn validate_decompress_manifest(
54 manifest: &ChunkManifest,
55 actual_compressed_len: usize,
56) -> Result<usize, CodecError> {
57 if manifest.original_size > MAX_DECOMPRESSED_BYTES {
58 return Err(CodecError::ManifestSizeExceedsLimit {
59 requested: manifest.original_size,
60 limit: MAX_DECOMPRESSED_BYTES,
61 });
62 }
63 if manifest.compressed_size != actual_compressed_len as u64 {
64 return Err(CodecError::ManifestSizeMismatch {
65 manifest: manifest.compressed_size,
66 actual: actual_compressed_len as u64,
67 });
68 }
69 usize::try_from(manifest.original_size).map_err(|_| CodecError::ManifestSizeExceedsLimit {
73 requested: manifest.original_size,
74 limit: usize::MAX as u64,
75 })
76}
77
78#[cfg(feature = "nvcomp-gpu")]
79mod imp {
80 use std::sync::Arc;
81
82 use crate::ferro_compress::{Algo, BitcompDataType, Codec as FerroCodec, NvcompCodec};
83 use bytes::Bytes;
84
85 use super::validate_decompress_manifest;
86 use crate::{ChunkManifest, Codec, CodecError, CodecKind};
87
88 pub struct NvcompZstdCodec {
90 inner: Arc<NvcompCodec>,
91 }
92
93 impl NvcompZstdCodec {
94 pub fn new() -> Result<Self, CodecError> {
95 let inner = NvcompCodec::new(Algo::Zstd)
96 .map_err(|e| CodecError::Backend(anyhow::anyhow!("nvcomp zstd init: {e}")))?;
97 Ok(Self {
98 inner: Arc::new(inner),
99 })
100 }
101 }
102
103 #[async_trait::async_trait]
104 impl Codec for NvcompZstdCodec {
105 fn kind(&self) -> CodecKind {
106 CodecKind::NvcompZstd
107 }
108
109 async fn compress(&self, input: Bytes) -> Result<(Bytes, ChunkManifest), CodecError> {
110 let original_size = input.len() as u64;
111 let original_crc = crc32c::crc32c(&input);
112 let codec = Arc::clone(&self.inner);
113 let compressed = tokio::task::spawn_blocking(move || -> Result<Vec<u8>, CodecError> {
114 let mut out = Vec::with_capacity(codec.max_compressed_len(input.len()));
115 codec.compress(input.as_ref(), &mut out).map_err(|e| {
116 CodecError::Backend(anyhow::anyhow!("nvcomp zstd compress: {e}"))
117 })?;
118 Ok(out)
119 })
120 .await??;
121 let manifest = ChunkManifest {
122 codec: CodecKind::NvcompZstd,
123 original_size,
124 compressed_size: compressed.len() as u64,
125 crc32c: original_crc,
126 };
127 Ok((Bytes::from(compressed), manifest))
128 }
129
130 async fn decompress(
131 &self,
132 input: Bytes,
133 manifest: &ChunkManifest,
134 ) -> Result<Bytes, CodecError> {
135 if manifest.codec != CodecKind::NvcompZstd {
136 return Err(CodecError::CodecMismatch {
137 expected: CodecKind::NvcompZstd,
138 got: manifest.codec,
139 });
140 }
141 let expected_crc = manifest.crc32c;
148 let expected_orig_size = validate_decompress_manifest(manifest, input.len())?;
149 let codec = Arc::clone(&self.inner);
150 let decompressed =
151 tokio::task::spawn_blocking(move || -> Result<Vec<u8>, CodecError> {
152 let mut out = Vec::with_capacity(expected_orig_size);
153 codec.decompress(input.as_ref(), &mut out).map_err(|e| {
154 CodecError::Backend(anyhow::anyhow!("nvcomp zstd decompress: {e}"))
155 })?;
156 Ok(out)
157 })
158 .await??;
159 if decompressed.len() != expected_orig_size {
160 return Err(CodecError::SizeMismatch {
161 expected: manifest.original_size,
162 got: decompressed.len() as u64,
163 });
164 }
165 let actual_crc = crc32c::crc32c(&decompressed);
166 if actual_crc != expected_crc {
167 return Err(CodecError::CrcMismatch {
168 expected: expected_crc,
169 got: actual_crc,
170 });
171 }
172 Ok(Bytes::from(decompressed))
173 }
174 }
175
176 pub struct NvcompBitcompCodec {
178 inner: Arc<NvcompCodec>,
179 }
180
181 impl NvcompBitcompCodec {
182 pub fn new(data_type: BitcompDataType) -> Result<Self, CodecError> {
185 let inner = NvcompCodec::new(Algo::Bitcomp { data_type })
186 .map_err(|e| CodecError::Backend(anyhow::anyhow!("nvcomp bitcomp init: {e}")))?;
187 Ok(Self {
188 inner: Arc::new(inner),
189 })
190 }
191
192 pub fn default_general() -> Result<Self, CodecError> {
194 Self::new(BitcompDataType::Char)
195 }
196 }
197
198 #[async_trait::async_trait]
199 impl Codec for NvcompBitcompCodec {
200 fn kind(&self) -> CodecKind {
201 CodecKind::NvcompBitcomp
202 }
203
204 async fn compress(&self, input: Bytes) -> Result<(Bytes, ChunkManifest), CodecError> {
205 let original_size = input.len() as u64;
206 let original_crc = crc32c::crc32c(&input);
207 let codec = Arc::clone(&self.inner);
208 let compressed = tokio::task::spawn_blocking(move || -> Result<Vec<u8>, CodecError> {
209 let mut out = Vec::with_capacity(codec.max_compressed_len(input.len()));
210 codec.compress(input.as_ref(), &mut out).map_err(|e| {
211 CodecError::Backend(anyhow::anyhow!("nvcomp bitcomp compress: {e}"))
212 })?;
213 Ok(out)
214 })
215 .await??;
216 let manifest = ChunkManifest {
217 codec: CodecKind::NvcompBitcomp,
218 original_size,
219 compressed_size: compressed.len() as u64,
220 crc32c: original_crc,
221 };
222 Ok((Bytes::from(compressed), manifest))
223 }
224
225 async fn decompress(
226 &self,
227 input: Bytes,
228 manifest: &ChunkManifest,
229 ) -> Result<Bytes, CodecError> {
230 if manifest.codec != CodecKind::NvcompBitcomp {
231 return Err(CodecError::CodecMismatch {
232 expected: CodecKind::NvcompBitcomp,
233 got: manifest.codec,
234 });
235 }
236 let expected_crc = manifest.crc32c;
238 let expected_orig_size = validate_decompress_manifest(manifest, input.len())?;
239 let codec = Arc::clone(&self.inner);
240 let decompressed =
241 tokio::task::spawn_blocking(move || -> Result<Vec<u8>, CodecError> {
242 let mut out = Vec::with_capacity(expected_orig_size);
243 codec.decompress(input.as_ref(), &mut out).map_err(|e| {
244 CodecError::Backend(anyhow::anyhow!("nvcomp bitcomp decompress: {e}"))
245 })?;
246 Ok(out)
247 })
248 .await??;
249 if decompressed.len() != expected_orig_size {
250 return Err(CodecError::SizeMismatch {
251 expected: manifest.original_size,
252 got: decompressed.len() as u64,
253 });
254 }
255 let actual_crc = crc32c::crc32c(&decompressed);
256 if actual_crc != expected_crc {
257 return Err(CodecError::CrcMismatch {
258 expected: expected_crc,
259 got: actual_crc,
260 });
261 }
262 Ok(Bytes::from(decompressed))
263 }
264 }
265
266 pub struct NvcompGDeflateCodec {
271 inner: Arc<NvcompCodec>,
272 }
273
274 impl NvcompGDeflateCodec {
275 pub fn new() -> Result<Self, CodecError> {
276 let inner = NvcompCodec::new(Algo::GDeflate)
277 .map_err(|e| CodecError::Backend(anyhow::anyhow!("nvcomp gdeflate init: {e}")))?;
278 Ok(Self {
279 inner: Arc::new(inner),
280 })
281 }
282 }
283
284 #[async_trait::async_trait]
285 impl Codec for NvcompGDeflateCodec {
286 fn kind(&self) -> CodecKind {
287 CodecKind::NvcompGDeflate
288 }
289
290 async fn compress(&self, input: Bytes) -> Result<(Bytes, ChunkManifest), CodecError> {
291 let original_size = input.len() as u64;
292 let original_crc = crc32c::crc32c(&input);
293 let codec = Arc::clone(&self.inner);
294 let compressed = tokio::task::spawn_blocking(move || -> Result<Vec<u8>, CodecError> {
295 let mut out = Vec::with_capacity(codec.max_compressed_len(input.len()));
296 codec.compress(input.as_ref(), &mut out).map_err(|e| {
297 CodecError::Backend(anyhow::anyhow!("nvcomp gdeflate compress: {e}"))
298 })?;
299 Ok(out)
300 })
301 .await??;
302 let manifest = ChunkManifest {
303 codec: CodecKind::NvcompGDeflate,
304 original_size,
305 compressed_size: compressed.len() as u64,
306 crc32c: original_crc,
307 };
308 Ok((Bytes::from(compressed), manifest))
309 }
310
311 async fn decompress(
312 &self,
313 input: Bytes,
314 manifest: &ChunkManifest,
315 ) -> Result<Bytes, CodecError> {
316 if manifest.codec != CodecKind::NvcompGDeflate {
317 return Err(CodecError::CodecMismatch {
318 expected: CodecKind::NvcompGDeflate,
319 got: manifest.codec,
320 });
321 }
322 let expected_crc = manifest.crc32c;
324 let expected_orig_size = validate_decompress_manifest(manifest, input.len())?;
325 let codec = Arc::clone(&self.inner);
326 let decompressed =
327 tokio::task::spawn_blocking(move || -> Result<Vec<u8>, CodecError> {
328 let mut out = Vec::with_capacity(expected_orig_size);
329 codec.decompress(input.as_ref(), &mut out).map_err(|e| {
330 CodecError::Backend(anyhow::anyhow!("nvcomp gdeflate decompress: {e}"))
331 })?;
332 Ok(out)
333 })
334 .await??;
335 if decompressed.len() != expected_orig_size {
336 return Err(CodecError::SizeMismatch {
337 expected: manifest.original_size,
338 got: decompressed.len() as u64,
339 });
340 }
341 let actual_crc = crc32c::crc32c(&decompressed);
342 if actual_crc != expected_crc {
343 return Err(CodecError::CrcMismatch {
344 expected: expected_crc,
345 got: actual_crc,
346 });
347 }
348 Ok(Bytes::from(decompressed))
349 }
350 }
351
352 pub fn is_gpu_available() -> bool {
354 NvcompCodec::is_available()
355 }
356}
357
358#[cfg(feature = "nvcomp-gpu")]
359pub use imp::{NvcompBitcompCodec, NvcompGDeflateCodec, NvcompZstdCodec, is_gpu_available};
360
361#[cfg(feature = "nvcomp-gpu")]
367pub use crate::ferro_compress::BitcompDataType;
368
369#[cfg(not(feature = "nvcomp-gpu"))]
370pub fn is_gpu_available() -> bool {
371 false
372}
373
374#[cfg(all(test, feature = "nvcomp-gpu"))]
375mod tests {
376 use super::*;
377 use crate::Codec;
378 use bytes::Bytes;
379
380 #[tokio::test]
381 #[ignore = "requires CUDA-capable GPU + NVCOMP_HOME at build time"]
382 async fn nvcomp_zstd_roundtrip() {
383 if !is_gpu_available() {
384 eprintln!("skipping: no CUDA GPU detected at runtime");
385 return;
386 }
387 let codec = NvcompZstdCodec::new().expect("init");
388 let input = Bytes::from(vec![b'a'; 100_000]);
389 let (compressed, manifest) = codec.compress(input.clone()).await.expect("compress");
390 assert!(compressed.len() < input.len() / 10);
391 let decompressed = codec
392 .decompress(compressed, &manifest)
393 .await
394 .expect("decompress");
395 assert_eq!(decompressed, input);
396 }
397
398 #[tokio::test]
399 #[ignore = "requires CUDA-capable GPU + NVCOMP_HOME at build time"]
400 async fn nvcomp_bitcomp_roundtrip_on_integer_column() {
401 if !is_gpu_available() {
402 eprintln!("skipping: no CUDA GPU detected at runtime");
403 return;
404 }
405 let codec = NvcompBitcompCodec::default_general().expect("init");
406 let mut payload: Vec<u8> = Vec::with_capacity(8192);
408 for i in 0i64..1024 {
409 payload.extend_from_slice(&i.to_le_bytes());
410 }
411 let input = Bytes::from(payload);
412 let (compressed, manifest) = codec.compress(input.clone()).await.expect("compress");
413 assert!(
415 compressed.len() < input.len() / 2,
416 "bitcomp on monotone i64 should compress >2x, got {} -> {}",
417 input.len(),
418 compressed.len()
419 );
420 let decompressed = codec
421 .decompress(compressed, &manifest)
422 .await
423 .expect("decompress");
424 assert_eq!(decompressed, input);
425 }
426}
427
428#[cfg(test)]
434mod manifest_validate_tests {
435 use super::{MAX_DECOMPRESSED_BYTES, validate_decompress_manifest};
436 use crate::{ChunkManifest, CodecError, CodecKind};
437
438 fn manifest(original: u64, compressed: u64) -> ChunkManifest {
439 ChunkManifest {
440 codec: CodecKind::NvcompZstd,
441 original_size: original,
442 compressed_size: compressed,
443 crc32c: 0,
444 }
445 }
446
447 #[test]
448 fn decompress_rejects_manifest_original_size_over_limit() {
449 let m = manifest(MAX_DECOMPRESSED_BYTES + 1, 1024);
453 let err = validate_decompress_manifest(&m, 1024).unwrap_err();
454 match err {
455 CodecError::ManifestSizeExceedsLimit { requested, limit } => {
456 assert_eq!(requested, MAX_DECOMPRESSED_BYTES + 1);
457 assert_eq!(limit, MAX_DECOMPRESSED_BYTES);
458 }
459 other => panic!("expected ManifestSizeExceedsLimit, got {other:?}"),
460 }
461 }
462
463 #[test]
464 fn decompress_rejects_manifest_compressed_size_mismatch() {
465 let m = manifest(1024, 2048);
469 let err = validate_decompress_manifest(&m, 1024).unwrap_err();
470 match err {
471 CodecError::ManifestSizeMismatch {
472 manifest: m_size,
473 actual,
474 } => {
475 assert_eq!(m_size, 2048);
476 assert_eq!(actual, 1024);
477 }
478 other => panic!("expected ManifestSizeMismatch, got {other:?}"),
479 }
480 }
481
482 #[test]
483 fn decompress_validates_well_formed_manifest() {
484 let m = manifest(MAX_DECOMPRESSED_BYTES, 1024);
487 let n = validate_decompress_manifest(&m, 1024)
488 .expect("well-formed manifest at the ceiling must pass");
489 assert_eq!(n as u64, MAX_DECOMPRESSED_BYTES);
490 }
491
492 #[cfg(target_pointer_width = "32")]
505 #[test]
506 fn decompress_rejects_u64_to_usize_overflow_on_32bit_targets() {
507 let m = manifest(MAX_DECOMPRESSED_BYTES, 1024);
509 let err = validate_decompress_manifest(&m, 1024).unwrap_err();
510 assert!(matches!(err, CodecError::ManifestSizeExceedsLimit { .. }));
511 }
512
513 #[cfg(target_pointer_width = "64")]
519 #[test]
520 fn decompress_rejects_u64_to_usize_overflow_on_32bit_targets() {
521 let m = manifest(MAX_DECOMPRESSED_BYTES, 16);
522 let n = validate_decompress_manifest(&m, 16).expect("limit value narrows on 64-bit");
523 assert_eq!(n as u64, MAX_DECOMPRESSED_BYTES);
524 }
525}