Skip to main content

s4_codec/
nvcomp.rs

1//! nvCOMP (NVIDIA proprietary) backend ラッパー。
2//!
3//! ## 設計方針 (2026-05-12 確定)
4//!
5//! - **integrated ferro-compress 経由**: nvCOMP の Rust binding を s4-codec の内部
6//!   module `crate::ferro_compress` (Apache-2.0 OR MIT) として物理統合済。本 module は
7//!   それを async な [`crate::Codec`] trait に bridge する薄い adapter。
8//! - **feature gate**: `nvcomp-gpu` feature を opt-in にすることで、CUDA toolchain と
9//!   NVCOMP_HOME が無い環境でも default build (cargo check / test) が green に保たれる。
10//! - **配布形態**: nvCOMP redist は NVIDIA SLA 制約あり。Phase 1 は **BYO 方式**
11//!   (顧客が NGC からダウンロード) を default、AMI 同梱は NVIDIA 書面確認後に判断。
12//!
13//! ## 提供 codec
14//!
15//! - [`NvcompZstdCodec`]: nvCOMP zstd-GPU。汎用 text / log。
16//! - [`NvcompBitcompCodec`]: nvCOMP Bitcomp。整数列 (Parquet 数値列、time-series)。
17//!
18//! ## ビルド方法
19//!
20//! ```bash
21//! export NVCOMP_HOME=/path/to/nvcomp-linux-x86_64-5.x.x.x_cuda12-archive
22//! cargo build --features nvcomp-gpu
23//! cargo test --features nvcomp-gpu -- --ignored  # GPU 必須テスト
24//! ```
25
26// v0.8.6 #89: `MAX_DECOMPRESSED_BYTES` and `validate_decompress_manifest`
27// were promoted from this module to crate root (`s4_codec::*`) so CPU
28// codecs (CpuZstd / CpuGzip) share the exact same pre-allocation guard.
29// Re-exported here under the historical names so any downstream that
30// imported `s4_codec::nvcomp::MAX_DECOMPRESSED_BYTES` keeps compiling.
31#[cfg(any(feature = "nvcomp-gpu", test))]
32pub use crate::{MAX_DECOMPRESSED_BYTES, validate_decompress_manifest};
33
34#[cfg(feature = "nvcomp-gpu")]
35mod imp {
36    use std::sync::Arc;
37
38    use crate::ferro_compress::{Algo, BitcompDataType, Codec as FerroCodec, NvcompCodec};
39    use bytes::Bytes;
40
41    use super::validate_decompress_manifest;
42    use crate::{ChunkManifest, Codec, CodecError, CodecKind, DECOMPRESS_BOOTSTRAP_CAPACITY};
43
44    /// nvCOMP zstd-GPU を S4 の `Codec` trait に bridge。
45    pub struct NvcompZstdCodec {
46        inner: Arc<NvcompCodec>,
47    }
48
49    impl NvcompZstdCodec {
50        pub fn new() -> Result<Self, CodecError> {
51            let inner = NvcompCodec::new(Algo::Zstd)
52                .map_err(|e| CodecError::Backend(anyhow::anyhow!("nvcomp zstd init: {e}")))?;
53            Ok(Self {
54                inner: Arc::new(inner),
55            })
56        }
57    }
58
59    #[async_trait::async_trait]
60    impl Codec for NvcompZstdCodec {
61        fn kind(&self) -> CodecKind {
62            CodecKind::NvcompZstd
63        }
64
65        async fn compress(&self, input: Bytes) -> Result<(Bytes, ChunkManifest), CodecError> {
66            let original_size = input.len() as u64;
67            let original_crc = crc32c::crc32c(&input);
68            let codec = Arc::clone(&self.inner);
69            let compressed = tokio::task::spawn_blocking(move || -> Result<Vec<u8>, CodecError> {
70                let mut out = Vec::with_capacity(codec.max_compressed_len(input.len()));
71                codec.compress(input.as_ref(), &mut out).map_err(|e| {
72                    CodecError::Backend(anyhow::anyhow!("nvcomp zstd compress: {e}"))
73                })?;
74                Ok(out)
75            })
76            .await??;
77            let manifest = ChunkManifest {
78                codec: CodecKind::NvcompZstd,
79                original_size,
80                compressed_size: compressed.len() as u64,
81                crc32c: original_crc,
82            };
83            Ok((Bytes::from(compressed), manifest))
84        }
85
86        async fn decompress(
87            &self,
88            input: Bytes,
89            manifest: &ChunkManifest,
90        ) -> Result<Bytes, CodecError> {
91            if manifest.codec != CodecKind::NvcompZstd {
92                return Err(CodecError::CodecMismatch {
93                    expected: CodecKind::NvcompZstd,
94                    got: manifest.codec,
95                });
96            }
97            // v0.8.5 #83 H-3: validate manifest BEFORE allocating the
98            // destination buffer. A forged manifest with a huge
99            // `original_size` would otherwise drive `Vec::with_capacity`
100            // straight into an OOM (memory-DoS) before the CRC check
101            // ever runs, and a `u64 as usize` truncation on a 32-bit
102            // target would silently under-allocate.
103            let expected_crc = manifest.crc32c;
104            let expected_orig_size = validate_decompress_manifest(manifest, input.len())?;
105            let codec = Arc::clone(&self.inner);
106            let decompressed =
107                tokio::task::spawn_blocking(move || -> Result<Vec<u8>, CodecError> {
108                    // v0.8.7 (Codex review HIGH): bootstrap-cap the initial
109                    // alloc to 1 MiB — `ferro_compress::Codec::decompress`'s
110                    // nvcomp_hlif impl `output.resize(decomp_bytes, 0)`s the
111                    // Vec itself based on the parsed compressed-frame header,
112                    // so the call-site `with_capacity` is purely a sizing
113                    // hint. Capping it removes the unconditional N-GiB host
114                    // alloc when an attacker-controlled manifest claims a
115                    // sub-5-GiB-but-still-huge `original_size` (e.g. u32::MAX
116                    // = 4 GiB), which `validate_decompress_manifest` accepts.
117                    let mut out =
118                        Vec::with_capacity(expected_orig_size.min(DECOMPRESS_BOOTSTRAP_CAPACITY));
119                    codec.decompress(input.as_ref(), &mut out).map_err(|e| {
120                        CodecError::Backend(anyhow::anyhow!("nvcomp zstd decompress: {e}"))
121                    })?;
122                    Ok(out)
123                })
124                .await??;
125            if decompressed.len() != expected_orig_size {
126                return Err(CodecError::SizeMismatch {
127                    expected: manifest.original_size,
128                    got: decompressed.len() as u64,
129                });
130            }
131            let actual_crc = crc32c::crc32c(&decompressed);
132            if actual_crc != expected_crc {
133                return Err(CodecError::CrcMismatch {
134                    expected: expected_crc,
135                    got: actual_crc,
136                });
137            }
138            Ok(Bytes::from(decompressed))
139        }
140    }
141
142    /// nvCOMP Bitcomp を S4 の `Codec` trait に bridge。整数列に最適化。
143    pub struct NvcompBitcompCodec {
144        inner: Arc<NvcompCodec>,
145    }
146
147    impl NvcompBitcompCodec {
148        /// `data_type` で nvCOMP の bit-packing / delta layout が決まる。
149        /// 整数列 / float 列で適切に使い分ける必要がある (Char 汎用は圧縮率が落ちる)。
150        pub fn new(data_type: BitcompDataType) -> Result<Self, CodecError> {
151            let inner = NvcompCodec::new(Algo::Bitcomp { data_type })
152                .map_err(|e| CodecError::Backend(anyhow::anyhow!("nvcomp bitcomp init: {e}")))?;
153            Ok(Self {
154                inner: Arc::new(inner),
155            })
156        }
157
158        /// デフォルト: data_type=Char (バイト列汎用)
159        pub fn default_general() -> Result<Self, CodecError> {
160            Self::new(BitcompDataType::Char)
161        }
162    }
163
164    #[async_trait::async_trait]
165    impl Codec for NvcompBitcompCodec {
166        fn kind(&self) -> CodecKind {
167            CodecKind::NvcompBitcomp
168        }
169
170        async fn compress(&self, input: Bytes) -> Result<(Bytes, ChunkManifest), CodecError> {
171            let original_size = input.len() as u64;
172            let original_crc = crc32c::crc32c(&input);
173            let codec = Arc::clone(&self.inner);
174            let compressed = tokio::task::spawn_blocking(move || -> Result<Vec<u8>, CodecError> {
175                let mut out = Vec::with_capacity(codec.max_compressed_len(input.len()));
176                codec.compress(input.as_ref(), &mut out).map_err(|e| {
177                    CodecError::Backend(anyhow::anyhow!("nvcomp bitcomp compress: {e}"))
178                })?;
179                Ok(out)
180            })
181            .await??;
182            let manifest = ChunkManifest {
183                codec: CodecKind::NvcompBitcomp,
184                original_size,
185                compressed_size: compressed.len() as u64,
186                crc32c: original_crc,
187            };
188            Ok((Bytes::from(compressed), manifest))
189        }
190
191        async fn decompress(
192            &self,
193            input: Bytes,
194            manifest: &ChunkManifest,
195        ) -> Result<Bytes, CodecError> {
196            if manifest.codec != CodecKind::NvcompBitcomp {
197                return Err(CodecError::CodecMismatch {
198                    expected: CodecKind::NvcompBitcomp,
199                    got: manifest.codec,
200                });
201            }
202            // v0.8.5 #83 H-3: see NvcompZstdCodec::decompress.
203            let expected_crc = manifest.crc32c;
204            let expected_orig_size = validate_decompress_manifest(manifest, input.len())?;
205            let codec = Arc::clone(&self.inner);
206            let decompressed =
207                tokio::task::spawn_blocking(move || -> Result<Vec<u8>, CodecError> {
208                    // v0.8.7 (Codex review HIGH): bootstrap-cap the initial
209                    // alloc to 1 MiB — `ferro_compress::Codec::decompress`'s
210                    // nvcomp_hlif impl `output.resize(decomp_bytes, 0)`s the
211                    // Vec itself based on the parsed compressed-frame header,
212                    // so the call-site `with_capacity` is purely a sizing
213                    // hint. Capping it removes the unconditional N-GiB host
214                    // alloc when an attacker-controlled manifest claims a
215                    // sub-5-GiB-but-still-huge `original_size` (e.g. u32::MAX
216                    // = 4 GiB), which `validate_decompress_manifest` accepts.
217                    let mut out =
218                        Vec::with_capacity(expected_orig_size.min(DECOMPRESS_BOOTSTRAP_CAPACITY));
219                    codec.decompress(input.as_ref(), &mut out).map_err(|e| {
220                        CodecError::Backend(anyhow::anyhow!("nvcomp bitcomp decompress: {e}"))
221                    })?;
222                    Ok(out)
223                })
224                .await??;
225            if decompressed.len() != expected_orig_size {
226                return Err(CodecError::SizeMismatch {
227                    expected: manifest.original_size,
228                    got: decompressed.len() as u64,
229                });
230            }
231            let actual_crc = crc32c::crc32c(&decompressed);
232            if actual_crc != expected_crc {
233                return Err(CodecError::CrcMismatch {
234                    expected: expected_crc,
235                    got: actual_crc,
236                });
237            }
238            Ok(Bytes::from(decompressed))
239        }
240    }
241
242    /// nvCOMP GDeflate を S4 の `Codec` trait に bridge (v0.2 #9)。
243    /// DEFLATE-family GPU codec。汎用 binary、log、JSON 等に zstd と並ぶ
244    /// 候補。zstd よりは圧縮率劣るが、algorithm-level format が DEFLATE
245    /// 互換なので将来 wrapper を被せれば stock gunzip でも復号可能 (Phase 2)。
246    pub struct NvcompGDeflateCodec {
247        inner: Arc<NvcompCodec>,
248    }
249
250    impl NvcompGDeflateCodec {
251        pub fn new() -> Result<Self, CodecError> {
252            let inner = NvcompCodec::new(Algo::GDeflate)
253                .map_err(|e| CodecError::Backend(anyhow::anyhow!("nvcomp gdeflate init: {e}")))?;
254            Ok(Self {
255                inner: Arc::new(inner),
256            })
257        }
258    }
259
260    #[async_trait::async_trait]
261    impl Codec for NvcompGDeflateCodec {
262        fn kind(&self) -> CodecKind {
263            CodecKind::NvcompGDeflate
264        }
265
266        async fn compress(&self, input: Bytes) -> Result<(Bytes, ChunkManifest), CodecError> {
267            let original_size = input.len() as u64;
268            let original_crc = crc32c::crc32c(&input);
269            let codec = Arc::clone(&self.inner);
270            let compressed = tokio::task::spawn_blocking(move || -> Result<Vec<u8>, CodecError> {
271                let mut out = Vec::with_capacity(codec.max_compressed_len(input.len()));
272                codec.compress(input.as_ref(), &mut out).map_err(|e| {
273                    CodecError::Backend(anyhow::anyhow!("nvcomp gdeflate compress: {e}"))
274                })?;
275                Ok(out)
276            })
277            .await??;
278            let manifest = ChunkManifest {
279                codec: CodecKind::NvcompGDeflate,
280                original_size,
281                compressed_size: compressed.len() as u64,
282                crc32c: original_crc,
283            };
284            Ok((Bytes::from(compressed), manifest))
285        }
286
287        async fn decompress(
288            &self,
289            input: Bytes,
290            manifest: &ChunkManifest,
291        ) -> Result<Bytes, CodecError> {
292            if manifest.codec != CodecKind::NvcompGDeflate {
293                return Err(CodecError::CodecMismatch {
294                    expected: CodecKind::NvcompGDeflate,
295                    got: manifest.codec,
296                });
297            }
298            // v0.8.5 #83 H-3: see NvcompZstdCodec::decompress.
299            let expected_crc = manifest.crc32c;
300            let expected_orig_size = validate_decompress_manifest(manifest, input.len())?;
301            let codec = Arc::clone(&self.inner);
302            let decompressed =
303                tokio::task::spawn_blocking(move || -> Result<Vec<u8>, CodecError> {
304                    // v0.8.7 (Codex review HIGH): bootstrap-cap the initial
305                    // alloc to 1 MiB — `ferro_compress::Codec::decompress`'s
306                    // nvcomp_hlif impl `output.resize(decomp_bytes, 0)`s the
307                    // Vec itself based on the parsed compressed-frame header,
308                    // so the call-site `with_capacity` is purely a sizing
309                    // hint. Capping it removes the unconditional N-GiB host
310                    // alloc when an attacker-controlled manifest claims a
311                    // sub-5-GiB-but-still-huge `original_size` (e.g. u32::MAX
312                    // = 4 GiB), which `validate_decompress_manifest` accepts.
313                    let mut out =
314                        Vec::with_capacity(expected_orig_size.min(DECOMPRESS_BOOTSTRAP_CAPACITY));
315                    codec.decompress(input.as_ref(), &mut out).map_err(|e| {
316                        CodecError::Backend(anyhow::anyhow!("nvcomp gdeflate decompress: {e}"))
317                    })?;
318                    Ok(out)
319                })
320                .await??;
321            if decompressed.len() != expected_orig_size {
322                return Err(CodecError::SizeMismatch {
323                    expected: manifest.original_size,
324                    got: decompressed.len() as u64,
325                });
326            }
327            let actual_crc = crc32c::crc32c(&decompressed);
328            if actual_crc != expected_crc {
329                return Err(CodecError::CrcMismatch {
330                    expected: expected_crc,
331                    got: actual_crc,
332                });
333            }
334            Ok(Bytes::from(decompressed))
335        }
336    }
337
338    /// CUDA-capable な GPU が runtime に存在するか
339    pub fn is_gpu_available() -> bool {
340        NvcompCodec::is_available()
341    }
342}
343
344#[cfg(feature = "nvcomp-gpu")]
345pub use imp::{NvcompBitcompCodec, NvcompGDeflateCodec, NvcompZstdCodec, is_gpu_available};
346
347/// nvCOMP Bitcomp typed-column hint. Selects the bit-packing layout the
348/// codec uses internally — the right choice can swing compression ratio
349/// from ~1.2× (`Char`, treating numeric data as opaque bytes) to >3.5×
350/// (`Uint32` on a sorted u32 posting list). Exposed publicly so callers
351/// can target their column shape without going through `default_general`.
352#[cfg(feature = "nvcomp-gpu")]
353pub use crate::ferro_compress::BitcompDataType;
354
355#[cfg(not(feature = "nvcomp-gpu"))]
356pub fn is_gpu_available() -> bool {
357    false
358}
359
360#[cfg(all(test, feature = "nvcomp-gpu"))]
361mod tests {
362    use super::*;
363    use crate::Codec;
364    use bytes::Bytes;
365
366    #[tokio::test]
367    #[ignore = "requires CUDA-capable GPU + NVCOMP_HOME at build time"]
368    async fn nvcomp_zstd_roundtrip() {
369        if !is_gpu_available() {
370            eprintln!("skipping: no CUDA GPU detected at runtime");
371            return;
372        }
373        let codec = NvcompZstdCodec::new().expect("init");
374        let input = Bytes::from(vec![b'a'; 100_000]);
375        let (compressed, manifest) = codec.compress(input.clone()).await.expect("compress");
376        assert!(compressed.len() < input.len() / 10);
377        let decompressed = codec
378            .decompress(compressed, &manifest)
379            .await
380            .expect("decompress");
381        assert_eq!(decompressed, input);
382    }
383
384    #[tokio::test]
385    #[ignore = "requires CUDA-capable GPU + NVCOMP_HOME at build time"]
386    async fn nvcomp_bitcomp_roundtrip_on_integer_column() {
387        if !is_gpu_available() {
388            eprintln!("skipping: no CUDA GPU detected at runtime");
389            return;
390        }
391        let codec = NvcompBitcompCodec::default_general().expect("init");
392        // Parquet 風の単調増加 i64 列 (8 KB 分 = 1024 elements)
393        let mut payload: Vec<u8> = Vec::with_capacity(8192);
394        for i in 0i64..1024 {
395            payload.extend_from_slice(&i.to_le_bytes());
396        }
397        let input = Bytes::from(payload);
398        let (compressed, manifest) = codec.compress(input.clone()).await.expect("compress");
399        // Bitcomp は単調整数で 3.6-7.5x 圧縮を期待 (Phase 0 実測値)
400        assert!(
401            compressed.len() < input.len() / 2,
402            "bitcomp on monotone i64 should compress >2x, got {} -> {}",
403            input.len(),
404            compressed.len()
405        );
406        let decompressed = codec
407            .decompress(compressed, &manifest)
408            .await
409            .expect("decompress");
410        assert_eq!(decompressed, input);
411    }
412}
413
414/// v0.8.5 #83 H-3 unit tests for the manifest pre-allocation validator.
415/// These run without `nvcomp-gpu` because they exercise the
416/// pure-host validation path — no CUDA runtime, no large allocations.
417/// Keeps the safety guard exercised on every `cargo test -p s4-codec`
418/// invocation, even on machines that can't build the full nvCOMP backend.
419#[cfg(test)]
420mod manifest_validate_tests {
421    use super::{MAX_DECOMPRESSED_BYTES, validate_decompress_manifest};
422    use crate::{ChunkManifest, CodecError, CodecKind};
423
424    fn manifest(original: u64, compressed: u64) -> ChunkManifest {
425        ChunkManifest {
426            codec: CodecKind::NvcompZstd,
427            original_size: original,
428            compressed_size: compressed,
429            crc32c: 0,
430        }
431    }
432
433    #[test]
434    fn decompress_rejects_manifest_original_size_over_limit() {
435        // Forged manifest claiming 6 GiB of decompressed output —
436        // would otherwise have allocated `Vec::with_capacity(6 GiB)`
437        // and tripped an OOM before any CRC check.
438        let m = manifest(MAX_DECOMPRESSED_BYTES + 1, 1024);
439        let err = validate_decompress_manifest(&m, 1024).unwrap_err();
440        match err {
441            CodecError::ManifestSizeExceedsLimit { requested, limit } => {
442                assert_eq!(requested, MAX_DECOMPRESSED_BYTES + 1);
443                assert_eq!(limit, MAX_DECOMPRESSED_BYTES);
444            }
445            other => panic!("expected ManifestSizeExceedsLimit, got {other:?}"),
446        }
447    }
448
449    #[test]
450    fn decompress_rejects_manifest_compressed_size_mismatch() {
451        // Forged manifest whose compressed_size disagrees with the
452        // actual payload length — fails fast pre-allocation so a
453        // truncated / padded payload cannot drive a sized read.
454        let m = manifest(1024, 2048);
455        let err = validate_decompress_manifest(&m, 1024).unwrap_err();
456        match err {
457            CodecError::ManifestSizeMismatch {
458                manifest: m_size,
459                actual,
460            } => {
461                assert_eq!(m_size, 2048);
462                assert_eq!(actual, 1024);
463            }
464            other => panic!("expected ManifestSizeMismatch, got {other:?}"),
465        }
466    }
467
468    /// 64-bit only: `MAX_DECOMPRESSED_BYTES` = 5 GiB > `usize::MAX` on
469    /// 32-bit targets, so the ceiling itself isn't representable as
470    /// `usize` there — the `validate_decompress_manifest` `usize::try_from`
471    /// arm correctly rejects with `ManifestSizeExceedsLimit { limit:
472    /// u32::MAX }`. That rejection is exercised by the dedicated
473    /// `decompress_rejects_u64_to_usize_overflow_on_32bit_targets` test
474    /// below. On 64-bit the ceiling does narrow to `usize`, so the
475    /// "well-formed at the ceiling" sanity belongs here (v0.9 #106).
476    #[cfg(target_pointer_width = "64")]
477    #[test]
478    fn decompress_validates_well_formed_manifest() {
479        // Sanity: a manifest whose original_size is at the ceiling
480        // and whose compressed_size matches is accepted.
481        let m = manifest(MAX_DECOMPRESSED_BYTES, 1024);
482        let n = validate_decompress_manifest(&m, 1024)
483            .expect("well-formed manifest at the ceiling must pass");
484        assert_eq!(n as u64, MAX_DECOMPRESSED_BYTES);
485    }
486
487    /// v0.8.5 #83 H-3: on a 32-bit target a `u64 → usize` cast can
488    /// truncate a multi-GiB manifest down to a few hundred MiB,
489    /// silently under-allocating the destination buffer. The explicit
490    /// `usize::try_from` arm in `validate_decompress_manifest` closes
491    /// that bug. We can't flip the host pointer width, but we can
492    /// assert the limit-check arm catches anything the try_from would
493    /// have rejected on either pointer width — on 32-bit
494    /// `MAX_DECOMPRESSED_BYTES` already exceeds `usize::MAX`, so the
495    /// limit arm catches it first; on 64-bit the limit guards the
496    /// same value-space below `usize::MAX`. Either way the
497    /// alloc-before-validate / silent-truncation bug class stays
498    /// closed.
499    #[cfg(target_pointer_width = "32")]
500    #[test]
501    fn decompress_rejects_u64_to_usize_overflow_on_32bit_targets() {
502        // 5 GiB > u32::MAX (~4 GiB), so the limit check fires first.
503        let m = manifest(MAX_DECOMPRESSED_BYTES, 1024);
504        let err = validate_decompress_manifest(&m, 1024).unwrap_err();
505        assert!(matches!(err, CodecError::ManifestSizeExceedsLimit { .. }));
506    }
507
508    /// 64-bit dual: validate that the entire safe range narrows
509    /// cleanly under `usize::try_from`. The 32-bit gating arm above
510    /// carries the forge-the-truncation contract on platforms where
511    /// it matters; on 64-bit the conversion can't fail for any value
512    /// within the accepted range.
513    #[cfg(target_pointer_width = "64")]
514    #[test]
515    fn decompress_rejects_u64_to_usize_overflow_on_32bit_targets() {
516        let m = manifest(MAX_DECOMPRESSED_BYTES, 16);
517        let n = validate_decompress_manifest(&m, 16).expect("limit value narrows on 64-bit");
518        assert_eq!(n as u64, MAX_DECOMPRESSED_BYTES);
519    }
520}