1#[cfg(any(feature = "nvcomp-gpu", test))]
32pub use crate::{MAX_DECOMPRESSED_BYTES, validate_decompress_manifest};
33
34#[cfg(feature = "nvcomp-gpu")]
35mod imp {
36 use std::sync::Arc;
37
38 use crate::ferro_compress::{Algo, BitcompDataType, Codec as FerroCodec, NvcompCodec};
39 use bytes::Bytes;
40
41 use super::validate_decompress_manifest;
42 use crate::{ChunkManifest, Codec, CodecError, CodecKind};
43
44 pub struct NvcompZstdCodec {
46 inner: Arc<NvcompCodec>,
47 }
48
49 impl NvcompZstdCodec {
50 pub fn new() -> Result<Self, CodecError> {
51 let inner = NvcompCodec::new(Algo::Zstd)
52 .map_err(|e| CodecError::Backend(anyhow::anyhow!("nvcomp zstd init: {e}")))?;
53 Ok(Self {
54 inner: Arc::new(inner),
55 })
56 }
57 }
58
59 #[async_trait::async_trait]
60 impl Codec for NvcompZstdCodec {
61 fn kind(&self) -> CodecKind {
62 CodecKind::NvcompZstd
63 }
64
65 async fn compress(&self, input: Bytes) -> Result<(Bytes, ChunkManifest), CodecError> {
66 let original_size = input.len() as u64;
67 let original_crc = crc32c::crc32c(&input);
68 let codec = Arc::clone(&self.inner);
69 let compressed = tokio::task::spawn_blocking(move || -> Result<Vec<u8>, CodecError> {
70 let mut out = Vec::with_capacity(codec.max_compressed_len(input.len()));
71 codec.compress(input.as_ref(), &mut out).map_err(|e| {
72 CodecError::Backend(anyhow::anyhow!("nvcomp zstd compress: {e}"))
73 })?;
74 Ok(out)
75 })
76 .await??;
77 let manifest = ChunkManifest {
78 codec: CodecKind::NvcompZstd,
79 original_size,
80 compressed_size: compressed.len() as u64,
81 crc32c: original_crc,
82 };
83 Ok((Bytes::from(compressed), manifest))
84 }
85
86 async fn decompress(
87 &self,
88 input: Bytes,
89 manifest: &ChunkManifest,
90 ) -> Result<Bytes, CodecError> {
91 if manifest.codec != CodecKind::NvcompZstd {
92 return Err(CodecError::CodecMismatch {
93 expected: CodecKind::NvcompZstd,
94 got: manifest.codec,
95 });
96 }
97 let expected_crc = manifest.crc32c;
104 let expected_orig_size = validate_decompress_manifest(manifest, input.len())?;
105 let codec = Arc::clone(&self.inner);
106 let decompressed =
107 tokio::task::spawn_blocking(move || -> Result<Vec<u8>, CodecError> {
108 let mut out = Vec::with_capacity(expected_orig_size);
109 codec.decompress(input.as_ref(), &mut out).map_err(|e| {
110 CodecError::Backend(anyhow::anyhow!("nvcomp zstd decompress: {e}"))
111 })?;
112 Ok(out)
113 })
114 .await??;
115 if decompressed.len() != expected_orig_size {
116 return Err(CodecError::SizeMismatch {
117 expected: manifest.original_size,
118 got: decompressed.len() as u64,
119 });
120 }
121 let actual_crc = crc32c::crc32c(&decompressed);
122 if actual_crc != expected_crc {
123 return Err(CodecError::CrcMismatch {
124 expected: expected_crc,
125 got: actual_crc,
126 });
127 }
128 Ok(Bytes::from(decompressed))
129 }
130 }
131
132 pub struct NvcompBitcompCodec {
134 inner: Arc<NvcompCodec>,
135 }
136
137 impl NvcompBitcompCodec {
138 pub fn new(data_type: BitcompDataType) -> Result<Self, CodecError> {
141 let inner = NvcompCodec::new(Algo::Bitcomp { data_type })
142 .map_err(|e| CodecError::Backend(anyhow::anyhow!("nvcomp bitcomp init: {e}")))?;
143 Ok(Self {
144 inner: Arc::new(inner),
145 })
146 }
147
148 pub fn default_general() -> Result<Self, CodecError> {
150 Self::new(BitcompDataType::Char)
151 }
152 }
153
154 #[async_trait::async_trait]
155 impl Codec for NvcompBitcompCodec {
156 fn kind(&self) -> CodecKind {
157 CodecKind::NvcompBitcomp
158 }
159
160 async fn compress(&self, input: Bytes) -> Result<(Bytes, ChunkManifest), CodecError> {
161 let original_size = input.len() as u64;
162 let original_crc = crc32c::crc32c(&input);
163 let codec = Arc::clone(&self.inner);
164 let compressed = tokio::task::spawn_blocking(move || -> Result<Vec<u8>, CodecError> {
165 let mut out = Vec::with_capacity(codec.max_compressed_len(input.len()));
166 codec.compress(input.as_ref(), &mut out).map_err(|e| {
167 CodecError::Backend(anyhow::anyhow!("nvcomp bitcomp compress: {e}"))
168 })?;
169 Ok(out)
170 })
171 .await??;
172 let manifest = ChunkManifest {
173 codec: CodecKind::NvcompBitcomp,
174 original_size,
175 compressed_size: compressed.len() as u64,
176 crc32c: original_crc,
177 };
178 Ok((Bytes::from(compressed), manifest))
179 }
180
181 async fn decompress(
182 &self,
183 input: Bytes,
184 manifest: &ChunkManifest,
185 ) -> Result<Bytes, CodecError> {
186 if manifest.codec != CodecKind::NvcompBitcomp {
187 return Err(CodecError::CodecMismatch {
188 expected: CodecKind::NvcompBitcomp,
189 got: manifest.codec,
190 });
191 }
192 let expected_crc = manifest.crc32c;
194 let expected_orig_size = validate_decompress_manifest(manifest, input.len())?;
195 let codec = Arc::clone(&self.inner);
196 let decompressed =
197 tokio::task::spawn_blocking(move || -> Result<Vec<u8>, CodecError> {
198 let mut out = Vec::with_capacity(expected_orig_size);
199 codec.decompress(input.as_ref(), &mut out).map_err(|e| {
200 CodecError::Backend(anyhow::anyhow!("nvcomp bitcomp decompress: {e}"))
201 })?;
202 Ok(out)
203 })
204 .await??;
205 if decompressed.len() != expected_orig_size {
206 return Err(CodecError::SizeMismatch {
207 expected: manifest.original_size,
208 got: decompressed.len() as u64,
209 });
210 }
211 let actual_crc = crc32c::crc32c(&decompressed);
212 if actual_crc != expected_crc {
213 return Err(CodecError::CrcMismatch {
214 expected: expected_crc,
215 got: actual_crc,
216 });
217 }
218 Ok(Bytes::from(decompressed))
219 }
220 }
221
222 pub struct NvcompGDeflateCodec {
227 inner: Arc<NvcompCodec>,
228 }
229
230 impl NvcompGDeflateCodec {
231 pub fn new() -> Result<Self, CodecError> {
232 let inner = NvcompCodec::new(Algo::GDeflate)
233 .map_err(|e| CodecError::Backend(anyhow::anyhow!("nvcomp gdeflate init: {e}")))?;
234 Ok(Self {
235 inner: Arc::new(inner),
236 })
237 }
238 }
239
240 #[async_trait::async_trait]
241 impl Codec for NvcompGDeflateCodec {
242 fn kind(&self) -> CodecKind {
243 CodecKind::NvcompGDeflate
244 }
245
246 async fn compress(&self, input: Bytes) -> Result<(Bytes, ChunkManifest), CodecError> {
247 let original_size = input.len() as u64;
248 let original_crc = crc32c::crc32c(&input);
249 let codec = Arc::clone(&self.inner);
250 let compressed = tokio::task::spawn_blocking(move || -> Result<Vec<u8>, CodecError> {
251 let mut out = Vec::with_capacity(codec.max_compressed_len(input.len()));
252 codec.compress(input.as_ref(), &mut out).map_err(|e| {
253 CodecError::Backend(anyhow::anyhow!("nvcomp gdeflate compress: {e}"))
254 })?;
255 Ok(out)
256 })
257 .await??;
258 let manifest = ChunkManifest {
259 codec: CodecKind::NvcompGDeflate,
260 original_size,
261 compressed_size: compressed.len() as u64,
262 crc32c: original_crc,
263 };
264 Ok((Bytes::from(compressed), manifest))
265 }
266
267 async fn decompress(
268 &self,
269 input: Bytes,
270 manifest: &ChunkManifest,
271 ) -> Result<Bytes, CodecError> {
272 if manifest.codec != CodecKind::NvcompGDeflate {
273 return Err(CodecError::CodecMismatch {
274 expected: CodecKind::NvcompGDeflate,
275 got: manifest.codec,
276 });
277 }
278 let expected_crc = manifest.crc32c;
280 let expected_orig_size = validate_decompress_manifest(manifest, input.len())?;
281 let codec = Arc::clone(&self.inner);
282 let decompressed =
283 tokio::task::spawn_blocking(move || -> Result<Vec<u8>, CodecError> {
284 let mut out = Vec::with_capacity(expected_orig_size);
285 codec.decompress(input.as_ref(), &mut out).map_err(|e| {
286 CodecError::Backend(anyhow::anyhow!("nvcomp gdeflate decompress: {e}"))
287 })?;
288 Ok(out)
289 })
290 .await??;
291 if decompressed.len() != expected_orig_size {
292 return Err(CodecError::SizeMismatch {
293 expected: manifest.original_size,
294 got: decompressed.len() as u64,
295 });
296 }
297 let actual_crc = crc32c::crc32c(&decompressed);
298 if actual_crc != expected_crc {
299 return Err(CodecError::CrcMismatch {
300 expected: expected_crc,
301 got: actual_crc,
302 });
303 }
304 Ok(Bytes::from(decompressed))
305 }
306 }
307
308 pub fn is_gpu_available() -> bool {
310 NvcompCodec::is_available()
311 }
312}
313
314#[cfg(feature = "nvcomp-gpu")]
315pub use imp::{NvcompBitcompCodec, NvcompGDeflateCodec, NvcompZstdCodec, is_gpu_available};
316
317#[cfg(feature = "nvcomp-gpu")]
323pub use crate::ferro_compress::BitcompDataType;
324
325#[cfg(not(feature = "nvcomp-gpu"))]
326pub fn is_gpu_available() -> bool {
327 false
328}
329
330#[cfg(all(test, feature = "nvcomp-gpu"))]
331mod tests {
332 use super::*;
333 use crate::Codec;
334 use bytes::Bytes;
335
336 #[tokio::test]
337 #[ignore = "requires CUDA-capable GPU + NVCOMP_HOME at build time"]
338 async fn nvcomp_zstd_roundtrip() {
339 if !is_gpu_available() {
340 eprintln!("skipping: no CUDA GPU detected at runtime");
341 return;
342 }
343 let codec = NvcompZstdCodec::new().expect("init");
344 let input = Bytes::from(vec![b'a'; 100_000]);
345 let (compressed, manifest) = codec.compress(input.clone()).await.expect("compress");
346 assert!(compressed.len() < input.len() / 10);
347 let decompressed = codec
348 .decompress(compressed, &manifest)
349 .await
350 .expect("decompress");
351 assert_eq!(decompressed, input);
352 }
353
354 #[tokio::test]
355 #[ignore = "requires CUDA-capable GPU + NVCOMP_HOME at build time"]
356 async fn nvcomp_bitcomp_roundtrip_on_integer_column() {
357 if !is_gpu_available() {
358 eprintln!("skipping: no CUDA GPU detected at runtime");
359 return;
360 }
361 let codec = NvcompBitcompCodec::default_general().expect("init");
362 let mut payload: Vec<u8> = Vec::with_capacity(8192);
364 for i in 0i64..1024 {
365 payload.extend_from_slice(&i.to_le_bytes());
366 }
367 let input = Bytes::from(payload);
368 let (compressed, manifest) = codec.compress(input.clone()).await.expect("compress");
369 assert!(
371 compressed.len() < input.len() / 2,
372 "bitcomp on monotone i64 should compress >2x, got {} -> {}",
373 input.len(),
374 compressed.len()
375 );
376 let decompressed = codec
377 .decompress(compressed, &manifest)
378 .await
379 .expect("decompress");
380 assert_eq!(decompressed, input);
381 }
382}
383
384#[cfg(test)]
390mod manifest_validate_tests {
391 use super::{MAX_DECOMPRESSED_BYTES, validate_decompress_manifest};
392 use crate::{ChunkManifest, CodecError, CodecKind};
393
394 fn manifest(original: u64, compressed: u64) -> ChunkManifest {
395 ChunkManifest {
396 codec: CodecKind::NvcompZstd,
397 original_size: original,
398 compressed_size: compressed,
399 crc32c: 0,
400 }
401 }
402
403 #[test]
404 fn decompress_rejects_manifest_original_size_over_limit() {
405 let m = manifest(MAX_DECOMPRESSED_BYTES + 1, 1024);
409 let err = validate_decompress_manifest(&m, 1024).unwrap_err();
410 match err {
411 CodecError::ManifestSizeExceedsLimit { requested, limit } => {
412 assert_eq!(requested, MAX_DECOMPRESSED_BYTES + 1);
413 assert_eq!(limit, MAX_DECOMPRESSED_BYTES);
414 }
415 other => panic!("expected ManifestSizeExceedsLimit, got {other:?}"),
416 }
417 }
418
419 #[test]
420 fn decompress_rejects_manifest_compressed_size_mismatch() {
421 let m = manifest(1024, 2048);
425 let err = validate_decompress_manifest(&m, 1024).unwrap_err();
426 match err {
427 CodecError::ManifestSizeMismatch {
428 manifest: m_size,
429 actual,
430 } => {
431 assert_eq!(m_size, 2048);
432 assert_eq!(actual, 1024);
433 }
434 other => panic!("expected ManifestSizeMismatch, got {other:?}"),
435 }
436 }
437
438 #[test]
439 fn decompress_validates_well_formed_manifest() {
440 let m = manifest(MAX_DECOMPRESSED_BYTES, 1024);
443 let n = validate_decompress_manifest(&m, 1024)
444 .expect("well-formed manifest at the ceiling must pass");
445 assert_eq!(n as u64, MAX_DECOMPRESSED_BYTES);
446 }
447
448 #[cfg(target_pointer_width = "32")]
461 #[test]
462 fn decompress_rejects_u64_to_usize_overflow_on_32bit_targets() {
463 let m = manifest(MAX_DECOMPRESSED_BYTES, 1024);
465 let err = validate_decompress_manifest(&m, 1024).unwrap_err();
466 assert!(matches!(err, CodecError::ManifestSizeExceedsLimit { .. }));
467 }
468
469 #[cfg(target_pointer_width = "64")]
475 #[test]
476 fn decompress_rejects_u64_to_usize_overflow_on_32bit_targets() {
477 let m = manifest(MAX_DECOMPRESSED_BYTES, 16);
478 let n = validate_decompress_manifest(&m, 16).expect("limit value narrows on 64-bit");
479 assert_eq!(n as u64, MAX_DECOMPRESSED_BYTES);
480 }
481}