Skip to main content

s4_codec/
nvcomp.rs

1//! nvCOMP (NVIDIA proprietary) backend ラッパー。
2//!
3//! ## 設計方針 (2026-05-12 確定)
4//!
5//! - **integrated ferro-compress 経由**: nvCOMP の Rust binding を s4-codec の内部
6//!   module `crate::ferro_compress` (Apache-2.0 OR MIT) として物理統合済。本 module は
7//!   それを async な [`crate::Codec`] trait に bridge する薄い adapter。
8//! - **feature gate**: `nvcomp-gpu` feature を opt-in にすることで、CUDA toolchain と
9//!   NVCOMP_HOME が無い環境でも default build (cargo check / test) が green に保たれる。
10//! - **配布形態**: nvCOMP redist は NVIDIA SLA 制約あり。Phase 1 は **BYO 方式**
11//!   (顧客が NGC からダウンロード) を default、AMI 同梱は NVIDIA 書面確認後に判断。
12//!
13//! ## 提供 codec
14//!
15//! - [`NvcompZstdCodec`]: nvCOMP zstd-GPU。汎用 text / log。
16//! - [`NvcompBitcompCodec`]: nvCOMP Bitcomp。整数列 (Parquet 数値列、time-series)。
17//!
18//! ## ビルド方法
19//!
20//! ```bash
21//! export NVCOMP_HOME=/path/to/nvcomp-linux-x86_64-5.x.x.x_cuda12-archive
22//! cargo build --features nvcomp-gpu
23//! cargo test --features nvcomp-gpu -- --ignored  # GPU 必須テスト
24//! ```
25
26// v0.8.6 #89: `MAX_DECOMPRESSED_BYTES` and `validate_decompress_manifest`
27// were promoted from this module to crate root (`s4_codec::*`) so CPU
28// codecs (CpuZstd / CpuGzip) share the exact same pre-allocation guard.
29// Re-exported here under the historical names so any downstream that
30// imported `s4_codec::nvcomp::MAX_DECOMPRESSED_BYTES` keeps compiling.
31#[cfg(any(feature = "nvcomp-gpu", test))]
32pub use crate::{MAX_DECOMPRESSED_BYTES, validate_decompress_manifest};
33
34#[cfg(feature = "nvcomp-gpu")]
35mod imp {
36    use std::sync::Arc;
37
38    use crate::ferro_compress::{Algo, BitcompDataType, Codec as FerroCodec, NvcompCodec};
39    use bytes::Bytes;
40
41    use super::validate_decompress_manifest;
42    use crate::{ChunkManifest, Codec, CodecError, CodecKind};
43
44    /// nvCOMP zstd-GPU を S4 の `Codec` trait に bridge。
45    pub struct NvcompZstdCodec {
46        inner: Arc<NvcompCodec>,
47    }
48
49    impl NvcompZstdCodec {
50        pub fn new() -> Result<Self, CodecError> {
51            let inner = NvcompCodec::new(Algo::Zstd)
52                .map_err(|e| CodecError::Backend(anyhow::anyhow!("nvcomp zstd init: {e}")))?;
53            Ok(Self {
54                inner: Arc::new(inner),
55            })
56        }
57    }
58
59    #[async_trait::async_trait]
60    impl Codec for NvcompZstdCodec {
61        fn kind(&self) -> CodecKind {
62            CodecKind::NvcompZstd
63        }
64
65        async fn compress(&self, input: Bytes) -> Result<(Bytes, ChunkManifest), CodecError> {
66            let original_size = input.len() as u64;
67            let original_crc = crc32c::crc32c(&input);
68            let codec = Arc::clone(&self.inner);
69            let compressed = tokio::task::spawn_blocking(move || -> Result<Vec<u8>, CodecError> {
70                let mut out = Vec::with_capacity(codec.max_compressed_len(input.len()));
71                codec.compress(input.as_ref(), &mut out).map_err(|e| {
72                    CodecError::Backend(anyhow::anyhow!("nvcomp zstd compress: {e}"))
73                })?;
74                Ok(out)
75            })
76            .await??;
77            let manifest = ChunkManifest {
78                codec: CodecKind::NvcompZstd,
79                original_size,
80                compressed_size: compressed.len() as u64,
81                crc32c: original_crc,
82            };
83            Ok((Bytes::from(compressed), manifest))
84        }
85
86        async fn decompress(
87            &self,
88            input: Bytes,
89            manifest: &ChunkManifest,
90        ) -> Result<Bytes, CodecError> {
91            if manifest.codec != CodecKind::NvcompZstd {
92                return Err(CodecError::CodecMismatch {
93                    expected: CodecKind::NvcompZstd,
94                    got: manifest.codec,
95                });
96            }
97            // v0.8.5 #83 H-3: validate manifest BEFORE allocating the
98            // destination buffer. A forged manifest with a huge
99            // `original_size` would otherwise drive `Vec::with_capacity`
100            // straight into an OOM (memory-DoS) before the CRC check
101            // ever runs, and a `u64 as usize` truncation on a 32-bit
102            // target would silently under-allocate.
103            let expected_crc = manifest.crc32c;
104            let expected_orig_size = validate_decompress_manifest(manifest, input.len())?;
105            let codec = Arc::clone(&self.inner);
106            let decompressed =
107                tokio::task::spawn_blocking(move || -> Result<Vec<u8>, CodecError> {
108                    let mut out = Vec::with_capacity(expected_orig_size);
109                    codec.decompress(input.as_ref(), &mut out).map_err(|e| {
110                        CodecError::Backend(anyhow::anyhow!("nvcomp zstd decompress: {e}"))
111                    })?;
112                    Ok(out)
113                })
114                .await??;
115            if decompressed.len() != expected_orig_size {
116                return Err(CodecError::SizeMismatch {
117                    expected: manifest.original_size,
118                    got: decompressed.len() as u64,
119                });
120            }
121            let actual_crc = crc32c::crc32c(&decompressed);
122            if actual_crc != expected_crc {
123                return Err(CodecError::CrcMismatch {
124                    expected: expected_crc,
125                    got: actual_crc,
126                });
127            }
128            Ok(Bytes::from(decompressed))
129        }
130    }
131
132    /// nvCOMP Bitcomp を S4 の `Codec` trait に bridge。整数列に最適化。
133    pub struct NvcompBitcompCodec {
134        inner: Arc<NvcompCodec>,
135    }
136
137    impl NvcompBitcompCodec {
138        /// `data_type` で nvCOMP の bit-packing / delta layout が決まる。
139        /// 整数列 / float 列で適切に使い分ける必要がある (Char 汎用は圧縮率が落ちる)。
140        pub fn new(data_type: BitcompDataType) -> Result<Self, CodecError> {
141            let inner = NvcompCodec::new(Algo::Bitcomp { data_type })
142                .map_err(|e| CodecError::Backend(anyhow::anyhow!("nvcomp bitcomp init: {e}")))?;
143            Ok(Self {
144                inner: Arc::new(inner),
145            })
146        }
147
148        /// デフォルト: data_type=Char (バイト列汎用)
149        pub fn default_general() -> Result<Self, CodecError> {
150            Self::new(BitcompDataType::Char)
151        }
152    }
153
154    #[async_trait::async_trait]
155    impl Codec for NvcompBitcompCodec {
156        fn kind(&self) -> CodecKind {
157            CodecKind::NvcompBitcomp
158        }
159
160        async fn compress(&self, input: Bytes) -> Result<(Bytes, ChunkManifest), CodecError> {
161            let original_size = input.len() as u64;
162            let original_crc = crc32c::crc32c(&input);
163            let codec = Arc::clone(&self.inner);
164            let compressed = tokio::task::spawn_blocking(move || -> Result<Vec<u8>, CodecError> {
165                let mut out = Vec::with_capacity(codec.max_compressed_len(input.len()));
166                codec.compress(input.as_ref(), &mut out).map_err(|e| {
167                    CodecError::Backend(anyhow::anyhow!("nvcomp bitcomp compress: {e}"))
168                })?;
169                Ok(out)
170            })
171            .await??;
172            let manifest = ChunkManifest {
173                codec: CodecKind::NvcompBitcomp,
174                original_size,
175                compressed_size: compressed.len() as u64,
176                crc32c: original_crc,
177            };
178            Ok((Bytes::from(compressed), manifest))
179        }
180
181        async fn decompress(
182            &self,
183            input: Bytes,
184            manifest: &ChunkManifest,
185        ) -> Result<Bytes, CodecError> {
186            if manifest.codec != CodecKind::NvcompBitcomp {
187                return Err(CodecError::CodecMismatch {
188                    expected: CodecKind::NvcompBitcomp,
189                    got: manifest.codec,
190                });
191            }
192            // v0.8.5 #83 H-3: see NvcompZstdCodec::decompress.
193            let expected_crc = manifest.crc32c;
194            let expected_orig_size = validate_decompress_manifest(manifest, input.len())?;
195            let codec = Arc::clone(&self.inner);
196            let decompressed =
197                tokio::task::spawn_blocking(move || -> Result<Vec<u8>, CodecError> {
198                    let mut out = Vec::with_capacity(expected_orig_size);
199                    codec.decompress(input.as_ref(), &mut out).map_err(|e| {
200                        CodecError::Backend(anyhow::anyhow!("nvcomp bitcomp decompress: {e}"))
201                    })?;
202                    Ok(out)
203                })
204                .await??;
205            if decompressed.len() != expected_orig_size {
206                return Err(CodecError::SizeMismatch {
207                    expected: manifest.original_size,
208                    got: decompressed.len() as u64,
209                });
210            }
211            let actual_crc = crc32c::crc32c(&decompressed);
212            if actual_crc != expected_crc {
213                return Err(CodecError::CrcMismatch {
214                    expected: expected_crc,
215                    got: actual_crc,
216                });
217            }
218            Ok(Bytes::from(decompressed))
219        }
220    }
221
222    /// nvCOMP GDeflate を S4 の `Codec` trait に bridge (v0.2 #9)。
223    /// DEFLATE-family GPU codec。汎用 binary、log、JSON 等に zstd と並ぶ
224    /// 候補。zstd よりは圧縮率劣るが、algorithm-level format が DEFLATE
225    /// 互換なので将来 wrapper を被せれば stock gunzip でも復号可能 (Phase 2)。
226    pub struct NvcompGDeflateCodec {
227        inner: Arc<NvcompCodec>,
228    }
229
230    impl NvcompGDeflateCodec {
231        pub fn new() -> Result<Self, CodecError> {
232            let inner = NvcompCodec::new(Algo::GDeflate)
233                .map_err(|e| CodecError::Backend(anyhow::anyhow!("nvcomp gdeflate init: {e}")))?;
234            Ok(Self {
235                inner: Arc::new(inner),
236            })
237        }
238    }
239
240    #[async_trait::async_trait]
241    impl Codec for NvcompGDeflateCodec {
242        fn kind(&self) -> CodecKind {
243            CodecKind::NvcompGDeflate
244        }
245
246        async fn compress(&self, input: Bytes) -> Result<(Bytes, ChunkManifest), CodecError> {
247            let original_size = input.len() as u64;
248            let original_crc = crc32c::crc32c(&input);
249            let codec = Arc::clone(&self.inner);
250            let compressed = tokio::task::spawn_blocking(move || -> Result<Vec<u8>, CodecError> {
251                let mut out = Vec::with_capacity(codec.max_compressed_len(input.len()));
252                codec.compress(input.as_ref(), &mut out).map_err(|e| {
253                    CodecError::Backend(anyhow::anyhow!("nvcomp gdeflate compress: {e}"))
254                })?;
255                Ok(out)
256            })
257            .await??;
258            let manifest = ChunkManifest {
259                codec: CodecKind::NvcompGDeflate,
260                original_size,
261                compressed_size: compressed.len() as u64,
262                crc32c: original_crc,
263            };
264            Ok((Bytes::from(compressed), manifest))
265        }
266
267        async fn decompress(
268            &self,
269            input: Bytes,
270            manifest: &ChunkManifest,
271        ) -> Result<Bytes, CodecError> {
272            if manifest.codec != CodecKind::NvcompGDeflate {
273                return Err(CodecError::CodecMismatch {
274                    expected: CodecKind::NvcompGDeflate,
275                    got: manifest.codec,
276                });
277            }
278            // v0.8.5 #83 H-3: see NvcompZstdCodec::decompress.
279            let expected_crc = manifest.crc32c;
280            let expected_orig_size = validate_decompress_manifest(manifest, input.len())?;
281            let codec = Arc::clone(&self.inner);
282            let decompressed =
283                tokio::task::spawn_blocking(move || -> Result<Vec<u8>, CodecError> {
284                    let mut out = Vec::with_capacity(expected_orig_size);
285                    codec.decompress(input.as_ref(), &mut out).map_err(|e| {
286                        CodecError::Backend(anyhow::anyhow!("nvcomp gdeflate decompress: {e}"))
287                    })?;
288                    Ok(out)
289                })
290                .await??;
291            if decompressed.len() != expected_orig_size {
292                return Err(CodecError::SizeMismatch {
293                    expected: manifest.original_size,
294                    got: decompressed.len() as u64,
295                });
296            }
297            let actual_crc = crc32c::crc32c(&decompressed);
298            if actual_crc != expected_crc {
299                return Err(CodecError::CrcMismatch {
300                    expected: expected_crc,
301                    got: actual_crc,
302                });
303            }
304            Ok(Bytes::from(decompressed))
305        }
306    }
307
308    /// CUDA-capable な GPU が runtime に存在するか
309    pub fn is_gpu_available() -> bool {
310        NvcompCodec::is_available()
311    }
312}
313
314#[cfg(feature = "nvcomp-gpu")]
315pub use imp::{NvcompBitcompCodec, NvcompGDeflateCodec, NvcompZstdCodec, is_gpu_available};
316
317/// nvCOMP Bitcomp typed-column hint. Selects the bit-packing layout the
318/// codec uses internally — the right choice can swing compression ratio
319/// from ~1.2× (`Char`, treating numeric data as opaque bytes) to >3.5×
320/// (`Uint32` on a sorted u32 posting list). Exposed publicly so callers
321/// can target their column shape without going through `default_general`.
322#[cfg(feature = "nvcomp-gpu")]
323pub use crate::ferro_compress::BitcompDataType;
324
325#[cfg(not(feature = "nvcomp-gpu"))]
326pub fn is_gpu_available() -> bool {
327    false
328}
329
330#[cfg(all(test, feature = "nvcomp-gpu"))]
331mod tests {
332    use super::*;
333    use crate::Codec;
334    use bytes::Bytes;
335
336    #[tokio::test]
337    #[ignore = "requires CUDA-capable GPU + NVCOMP_HOME at build time"]
338    async fn nvcomp_zstd_roundtrip() {
339        if !is_gpu_available() {
340            eprintln!("skipping: no CUDA GPU detected at runtime");
341            return;
342        }
343        let codec = NvcompZstdCodec::new().expect("init");
344        let input = Bytes::from(vec![b'a'; 100_000]);
345        let (compressed, manifest) = codec.compress(input.clone()).await.expect("compress");
346        assert!(compressed.len() < input.len() / 10);
347        let decompressed = codec
348            .decompress(compressed, &manifest)
349            .await
350            .expect("decompress");
351        assert_eq!(decompressed, input);
352    }
353
354    #[tokio::test]
355    #[ignore = "requires CUDA-capable GPU + NVCOMP_HOME at build time"]
356    async fn nvcomp_bitcomp_roundtrip_on_integer_column() {
357        if !is_gpu_available() {
358            eprintln!("skipping: no CUDA GPU detected at runtime");
359            return;
360        }
361        let codec = NvcompBitcompCodec::default_general().expect("init");
362        // Parquet 風の単調増加 i64 列 (8 KB 分 = 1024 elements)
363        let mut payload: Vec<u8> = Vec::with_capacity(8192);
364        for i in 0i64..1024 {
365            payload.extend_from_slice(&i.to_le_bytes());
366        }
367        let input = Bytes::from(payload);
368        let (compressed, manifest) = codec.compress(input.clone()).await.expect("compress");
369        // Bitcomp は単調整数で 3.6-7.5x 圧縮を期待 (Phase 0 実測値)
370        assert!(
371            compressed.len() < input.len() / 2,
372            "bitcomp on monotone i64 should compress >2x, got {} -> {}",
373            input.len(),
374            compressed.len()
375        );
376        let decompressed = codec
377            .decompress(compressed, &manifest)
378            .await
379            .expect("decompress");
380        assert_eq!(decompressed, input);
381    }
382}
383
384/// v0.8.5 #83 H-3 unit tests for the manifest pre-allocation validator.
385/// These run without `nvcomp-gpu` because they exercise the
386/// pure-host validation path — no CUDA runtime, no large allocations.
387/// Keeps the safety guard exercised on every `cargo test -p s4-codec`
388/// invocation, even on machines that can't build the full nvCOMP backend.
389#[cfg(test)]
390mod manifest_validate_tests {
391    use super::{MAX_DECOMPRESSED_BYTES, validate_decompress_manifest};
392    use crate::{ChunkManifest, CodecError, CodecKind};
393
394    fn manifest(original: u64, compressed: u64) -> ChunkManifest {
395        ChunkManifest {
396            codec: CodecKind::NvcompZstd,
397            original_size: original,
398            compressed_size: compressed,
399            crc32c: 0,
400        }
401    }
402
403    #[test]
404    fn decompress_rejects_manifest_original_size_over_limit() {
405        // Forged manifest claiming 6 GiB of decompressed output —
406        // would otherwise have allocated `Vec::with_capacity(6 GiB)`
407        // and tripped an OOM before any CRC check.
408        let m = manifest(MAX_DECOMPRESSED_BYTES + 1, 1024);
409        let err = validate_decompress_manifest(&m, 1024).unwrap_err();
410        match err {
411            CodecError::ManifestSizeExceedsLimit { requested, limit } => {
412                assert_eq!(requested, MAX_DECOMPRESSED_BYTES + 1);
413                assert_eq!(limit, MAX_DECOMPRESSED_BYTES);
414            }
415            other => panic!("expected ManifestSizeExceedsLimit, got {other:?}"),
416        }
417    }
418
419    #[test]
420    fn decompress_rejects_manifest_compressed_size_mismatch() {
421        // Forged manifest whose compressed_size disagrees with the
422        // actual payload length — fails fast pre-allocation so a
423        // truncated / padded payload cannot drive a sized read.
424        let m = manifest(1024, 2048);
425        let err = validate_decompress_manifest(&m, 1024).unwrap_err();
426        match err {
427            CodecError::ManifestSizeMismatch {
428                manifest: m_size,
429                actual,
430            } => {
431                assert_eq!(m_size, 2048);
432                assert_eq!(actual, 1024);
433            }
434            other => panic!("expected ManifestSizeMismatch, got {other:?}"),
435        }
436    }
437
438    #[test]
439    fn decompress_validates_well_formed_manifest() {
440        // Sanity: a manifest whose original_size is at the ceiling
441        // and whose compressed_size matches is accepted.
442        let m = manifest(MAX_DECOMPRESSED_BYTES, 1024);
443        let n = validate_decompress_manifest(&m, 1024)
444            .expect("well-formed manifest at the ceiling must pass");
445        assert_eq!(n as u64, MAX_DECOMPRESSED_BYTES);
446    }
447
448    /// v0.8.5 #83 H-3: on a 32-bit target a `u64 → usize` cast can
449    /// truncate a multi-GiB manifest down to a few hundred MiB,
450    /// silently under-allocating the destination buffer. The explicit
451    /// `usize::try_from` arm in `validate_decompress_manifest` closes
452    /// that bug. We can't flip the host pointer width, but we can
453    /// assert the limit-check arm catches anything the try_from would
454    /// have rejected on either pointer width — on 32-bit
455    /// `MAX_DECOMPRESSED_BYTES` already exceeds `usize::MAX`, so the
456    /// limit arm catches it first; on 64-bit the limit guards the
457    /// same value-space below `usize::MAX`. Either way the
458    /// alloc-before-validate / silent-truncation bug class stays
459    /// closed.
460    #[cfg(target_pointer_width = "32")]
461    #[test]
462    fn decompress_rejects_u64_to_usize_overflow_on_32bit_targets() {
463        // 5 GiB > u32::MAX (~4 GiB), so the limit check fires first.
464        let m = manifest(MAX_DECOMPRESSED_BYTES, 1024);
465        let err = validate_decompress_manifest(&m, 1024).unwrap_err();
466        assert!(matches!(err, CodecError::ManifestSizeExceedsLimit { .. }));
467    }
468
469    /// 64-bit dual: validate that the entire safe range narrows
470    /// cleanly under `usize::try_from`. The 32-bit gating arm above
471    /// carries the forge-the-truncation contract on platforms where
472    /// it matters; on 64-bit the conversion can't fail for any value
473    /// within the accepted range.
474    #[cfg(target_pointer_width = "64")]
475    #[test]
476    fn decompress_rejects_u64_to_usize_overflow_on_32bit_targets() {
477        let m = manifest(MAX_DECOMPRESSED_BYTES, 16);
478        let n = validate_decompress_manifest(&m, 16).expect("limit value narrows on 64-bit");
479        assert_eq!(n as u64, MAX_DECOMPRESSED_BYTES);
480    }
481}