Skip to main content

s4_codec/
nvcomp.rs

1//! nvCOMP (NVIDIA proprietary) backend ラッパー。
2//!
3//! ## 設計方針 (2026-05-12 確定)
4//!
5//! - **integrated ferro-compress 経由**: nvCOMP の Rust binding を s4-codec の内部
6//!   module `crate::ferro_compress` (Apache-2.0 OR MIT) として物理統合済。本 module は
7//!   それを async な [`crate::Codec`] trait に bridge する薄い adapter。
8//! - **feature gate**: `nvcomp-gpu` feature を opt-in にすることで、CUDA toolchain と
9//!   NVCOMP_HOME が無い環境でも default build (cargo check / test) が green に保たれる。
10//! - **配布形態**: nvCOMP redist は NVIDIA SLA 制約あり。Phase 1 は **BYO 方式**
11//!   (顧客が NGC からダウンロード) を default、AMI 同梱は NVIDIA 書面確認後に判断。
12//!
13//! ## 提供 codec
14//!
15//! - [`NvcompZstdCodec`]: nvCOMP zstd-GPU。汎用 text / log。
16//! - [`NvcompBitcompCodec`]: nvCOMP Bitcomp。整数列 (Parquet 数値列、time-series)。
17//!
18//! ## ビルド方法
19//!
20//! ```bash
21//! export NVCOMP_HOME=/path/to/nvcomp-linux-x86_64-5.x.x.x_cuda12-archive
22//! cargo build --features nvcomp-gpu
23//! cargo test --features nvcomp-gpu -- --ignored  # GPU 必須テスト
24//! ```
25
26#[cfg(any(feature = "nvcomp-gpu", test))]
27use crate::{ChunkManifest, CodecError};
28
29/// v0.8.5 #83 H-3: maximum decompressed payload size honoured at
30/// decompress entry. Manifests claiming a larger `original_size` are
31/// rejected pre-allocation as forged / corrupted, so a malicious
32/// manifest cannot drive `Vec::with_capacity(huge)` into an OOM
33/// (memory-DoS) before the CRC check ever runs.
34///
35/// Rationale for 5 GiB: matches AWS S3's documented single-PUT object
36/// ceiling (`PUT Object` is capped at 5 GiB; bigger payloads must use
37/// multipart upload, which is split into ≤5 GiB parts). Real S4
38/// chunks are bounded by the same ceiling end-to-end, so a manifest
39/// whose `original_size` exceeds it cannot have come from a
40/// well-formed S4 PUT.
41#[cfg(any(feature = "nvcomp-gpu", test))]
42pub const MAX_DECOMPRESSED_BYTES: u64 = 5 * 1024 * 1024 * 1024;
43
44/// v0.8.5 #83 H-3 helper: shared pre-allocation manifest validator
45/// invoked by every nvCOMP decompress path (Zstd / Bitcomp /
46/// GDeflate). Centralising the check keeps the three decompress sites
47/// (and any future nvCOMP codec) using identical limits and error
48/// shapes, so one missed update can't reintroduce the alloc-before-
49/// validate bug. Returns the `usize`-narrowed `original_size` ready
50/// for `Vec::with_capacity`, or a typed `CodecError` the caller
51/// propagates verbatim.
52#[cfg(any(feature = "nvcomp-gpu", test))]
53pub(crate) fn validate_decompress_manifest(
54    manifest: &ChunkManifest,
55    actual_compressed_len: usize,
56) -> Result<usize, CodecError> {
57    if manifest.original_size > MAX_DECOMPRESSED_BYTES {
58        return Err(CodecError::ManifestSizeExceedsLimit {
59            requested: manifest.original_size,
60            limit: MAX_DECOMPRESSED_BYTES,
61        });
62    }
63    if manifest.compressed_size != actual_compressed_len as u64 {
64        return Err(CodecError::ManifestSizeMismatch {
65            manifest: manifest.compressed_size,
66            actual: actual_compressed_len as u64,
67        });
68    }
69    // `u64 → usize` is lossy on 32-bit targets; reject explicitly so
70    // a 3 GiB manifest doesn't truncate to ~0 bytes on wasm32 / armv7
71    // and silently under-allocate the destination buffer.
72    usize::try_from(manifest.original_size).map_err(|_| CodecError::ManifestSizeExceedsLimit {
73        requested: manifest.original_size,
74        limit: usize::MAX as u64,
75    })
76}
77
78#[cfg(feature = "nvcomp-gpu")]
79mod imp {
80    use std::sync::Arc;
81
82    use crate::ferro_compress::{Algo, BitcompDataType, Codec as FerroCodec, NvcompCodec};
83    use bytes::Bytes;
84
85    use super::validate_decompress_manifest;
86    use crate::{ChunkManifest, Codec, CodecError, CodecKind};
87
88    /// nvCOMP zstd-GPU を S4 の `Codec` trait に bridge。
89    pub struct NvcompZstdCodec {
90        inner: Arc<NvcompCodec>,
91    }
92
93    impl NvcompZstdCodec {
94        pub fn new() -> Result<Self, CodecError> {
95            let inner = NvcompCodec::new(Algo::Zstd)
96                .map_err(|e| CodecError::Backend(anyhow::anyhow!("nvcomp zstd init: {e}")))?;
97            Ok(Self {
98                inner: Arc::new(inner),
99            })
100        }
101    }
102
103    #[async_trait::async_trait]
104    impl Codec for NvcompZstdCodec {
105        fn kind(&self) -> CodecKind {
106            CodecKind::NvcompZstd
107        }
108
109        async fn compress(&self, input: Bytes) -> Result<(Bytes, ChunkManifest), CodecError> {
110            let original_size = input.len() as u64;
111            let original_crc = crc32c::crc32c(&input);
112            let codec = Arc::clone(&self.inner);
113            let compressed = tokio::task::spawn_blocking(move || -> Result<Vec<u8>, CodecError> {
114                let mut out = Vec::with_capacity(codec.max_compressed_len(input.len()));
115                codec.compress(input.as_ref(), &mut out).map_err(|e| {
116                    CodecError::Backend(anyhow::anyhow!("nvcomp zstd compress: {e}"))
117                })?;
118                Ok(out)
119            })
120            .await??;
121            let manifest = ChunkManifest {
122                codec: CodecKind::NvcompZstd,
123                original_size,
124                compressed_size: compressed.len() as u64,
125                crc32c: original_crc,
126            };
127            Ok((Bytes::from(compressed), manifest))
128        }
129
130        async fn decompress(
131            &self,
132            input: Bytes,
133            manifest: &ChunkManifest,
134        ) -> Result<Bytes, CodecError> {
135            if manifest.codec != CodecKind::NvcompZstd {
136                return Err(CodecError::CodecMismatch {
137                    expected: CodecKind::NvcompZstd,
138                    got: manifest.codec,
139                });
140            }
141            // v0.8.5 #83 H-3: validate manifest BEFORE allocating the
142            // destination buffer. A forged manifest with a huge
143            // `original_size` would otherwise drive `Vec::with_capacity`
144            // straight into an OOM (memory-DoS) before the CRC check
145            // ever runs, and a `u64 as usize` truncation on a 32-bit
146            // target would silently under-allocate.
147            let expected_crc = manifest.crc32c;
148            let expected_orig_size = validate_decompress_manifest(manifest, input.len())?;
149            let codec = Arc::clone(&self.inner);
150            let decompressed =
151                tokio::task::spawn_blocking(move || -> Result<Vec<u8>, CodecError> {
152                    let mut out = Vec::with_capacity(expected_orig_size);
153                    codec.decompress(input.as_ref(), &mut out).map_err(|e| {
154                        CodecError::Backend(anyhow::anyhow!("nvcomp zstd decompress: {e}"))
155                    })?;
156                    Ok(out)
157                })
158                .await??;
159            if decompressed.len() != expected_orig_size {
160                return Err(CodecError::SizeMismatch {
161                    expected: manifest.original_size,
162                    got: decompressed.len() as u64,
163                });
164            }
165            let actual_crc = crc32c::crc32c(&decompressed);
166            if actual_crc != expected_crc {
167                return Err(CodecError::CrcMismatch {
168                    expected: expected_crc,
169                    got: actual_crc,
170                });
171            }
172            Ok(Bytes::from(decompressed))
173        }
174    }
175
176    /// nvCOMP Bitcomp を S4 の `Codec` trait に bridge。整数列に最適化。
177    pub struct NvcompBitcompCodec {
178        inner: Arc<NvcompCodec>,
179    }
180
181    impl NvcompBitcompCodec {
182        /// `data_type` で nvCOMP の bit-packing / delta layout が決まる。
183        /// 整数列 / float 列で適切に使い分ける必要がある (Char 汎用は圧縮率が落ちる)。
184        pub fn new(data_type: BitcompDataType) -> Result<Self, CodecError> {
185            let inner = NvcompCodec::new(Algo::Bitcomp { data_type })
186                .map_err(|e| CodecError::Backend(anyhow::anyhow!("nvcomp bitcomp init: {e}")))?;
187            Ok(Self {
188                inner: Arc::new(inner),
189            })
190        }
191
192        /// デフォルト: data_type=Char (バイト列汎用)
193        pub fn default_general() -> Result<Self, CodecError> {
194            Self::new(BitcompDataType::Char)
195        }
196    }
197
198    #[async_trait::async_trait]
199    impl Codec for NvcompBitcompCodec {
200        fn kind(&self) -> CodecKind {
201            CodecKind::NvcompBitcomp
202        }
203
204        async fn compress(&self, input: Bytes) -> Result<(Bytes, ChunkManifest), CodecError> {
205            let original_size = input.len() as u64;
206            let original_crc = crc32c::crc32c(&input);
207            let codec = Arc::clone(&self.inner);
208            let compressed = tokio::task::spawn_blocking(move || -> Result<Vec<u8>, CodecError> {
209                let mut out = Vec::with_capacity(codec.max_compressed_len(input.len()));
210                codec.compress(input.as_ref(), &mut out).map_err(|e| {
211                    CodecError::Backend(anyhow::anyhow!("nvcomp bitcomp compress: {e}"))
212                })?;
213                Ok(out)
214            })
215            .await??;
216            let manifest = ChunkManifest {
217                codec: CodecKind::NvcompBitcomp,
218                original_size,
219                compressed_size: compressed.len() as u64,
220                crc32c: original_crc,
221            };
222            Ok((Bytes::from(compressed), manifest))
223        }
224
225        async fn decompress(
226            &self,
227            input: Bytes,
228            manifest: &ChunkManifest,
229        ) -> Result<Bytes, CodecError> {
230            if manifest.codec != CodecKind::NvcompBitcomp {
231                return Err(CodecError::CodecMismatch {
232                    expected: CodecKind::NvcompBitcomp,
233                    got: manifest.codec,
234                });
235            }
236            // v0.8.5 #83 H-3: see NvcompZstdCodec::decompress.
237            let expected_crc = manifest.crc32c;
238            let expected_orig_size = validate_decompress_manifest(manifest, input.len())?;
239            let codec = Arc::clone(&self.inner);
240            let decompressed =
241                tokio::task::spawn_blocking(move || -> Result<Vec<u8>, CodecError> {
242                    let mut out = Vec::with_capacity(expected_orig_size);
243                    codec.decompress(input.as_ref(), &mut out).map_err(|e| {
244                        CodecError::Backend(anyhow::anyhow!("nvcomp bitcomp decompress: {e}"))
245                    })?;
246                    Ok(out)
247                })
248                .await??;
249            if decompressed.len() != expected_orig_size {
250                return Err(CodecError::SizeMismatch {
251                    expected: manifest.original_size,
252                    got: decompressed.len() as u64,
253                });
254            }
255            let actual_crc = crc32c::crc32c(&decompressed);
256            if actual_crc != expected_crc {
257                return Err(CodecError::CrcMismatch {
258                    expected: expected_crc,
259                    got: actual_crc,
260                });
261            }
262            Ok(Bytes::from(decompressed))
263        }
264    }
265
266    /// nvCOMP GDeflate を S4 の `Codec` trait に bridge (v0.2 #9)。
267    /// DEFLATE-family GPU codec。汎用 binary、log、JSON 等に zstd と並ぶ
268    /// 候補。zstd よりは圧縮率劣るが、algorithm-level format が DEFLATE
269    /// 互換なので将来 wrapper を被せれば stock gunzip でも復号可能 (Phase 2)。
270    pub struct NvcompGDeflateCodec {
271        inner: Arc<NvcompCodec>,
272    }
273
274    impl NvcompGDeflateCodec {
275        pub fn new() -> Result<Self, CodecError> {
276            let inner = NvcompCodec::new(Algo::GDeflate)
277                .map_err(|e| CodecError::Backend(anyhow::anyhow!("nvcomp gdeflate init: {e}")))?;
278            Ok(Self {
279                inner: Arc::new(inner),
280            })
281        }
282    }
283
284    #[async_trait::async_trait]
285    impl Codec for NvcompGDeflateCodec {
286        fn kind(&self) -> CodecKind {
287            CodecKind::NvcompGDeflate
288        }
289
290        async fn compress(&self, input: Bytes) -> Result<(Bytes, ChunkManifest), CodecError> {
291            let original_size = input.len() as u64;
292            let original_crc = crc32c::crc32c(&input);
293            let codec = Arc::clone(&self.inner);
294            let compressed = tokio::task::spawn_blocking(move || -> Result<Vec<u8>, CodecError> {
295                let mut out = Vec::with_capacity(codec.max_compressed_len(input.len()));
296                codec.compress(input.as_ref(), &mut out).map_err(|e| {
297                    CodecError::Backend(anyhow::anyhow!("nvcomp gdeflate compress: {e}"))
298                })?;
299                Ok(out)
300            })
301            .await??;
302            let manifest = ChunkManifest {
303                codec: CodecKind::NvcompGDeflate,
304                original_size,
305                compressed_size: compressed.len() as u64,
306                crc32c: original_crc,
307            };
308            Ok((Bytes::from(compressed), manifest))
309        }
310
311        async fn decompress(
312            &self,
313            input: Bytes,
314            manifest: &ChunkManifest,
315        ) -> Result<Bytes, CodecError> {
316            if manifest.codec != CodecKind::NvcompGDeflate {
317                return Err(CodecError::CodecMismatch {
318                    expected: CodecKind::NvcompGDeflate,
319                    got: manifest.codec,
320                });
321            }
322            // v0.8.5 #83 H-3: see NvcompZstdCodec::decompress.
323            let expected_crc = manifest.crc32c;
324            let expected_orig_size = validate_decompress_manifest(manifest, input.len())?;
325            let codec = Arc::clone(&self.inner);
326            let decompressed =
327                tokio::task::spawn_blocking(move || -> Result<Vec<u8>, CodecError> {
328                    let mut out = Vec::with_capacity(expected_orig_size);
329                    codec.decompress(input.as_ref(), &mut out).map_err(|e| {
330                        CodecError::Backend(anyhow::anyhow!("nvcomp gdeflate decompress: {e}"))
331                    })?;
332                    Ok(out)
333                })
334                .await??;
335            if decompressed.len() != expected_orig_size {
336                return Err(CodecError::SizeMismatch {
337                    expected: manifest.original_size,
338                    got: decompressed.len() as u64,
339                });
340            }
341            let actual_crc = crc32c::crc32c(&decompressed);
342            if actual_crc != expected_crc {
343                return Err(CodecError::CrcMismatch {
344                    expected: expected_crc,
345                    got: actual_crc,
346                });
347            }
348            Ok(Bytes::from(decompressed))
349        }
350    }
351
352    /// CUDA-capable な GPU が runtime に存在するか
353    pub fn is_gpu_available() -> bool {
354        NvcompCodec::is_available()
355    }
356}
357
358#[cfg(feature = "nvcomp-gpu")]
359pub use imp::{NvcompBitcompCodec, NvcompGDeflateCodec, NvcompZstdCodec, is_gpu_available};
360
361/// nvCOMP Bitcomp typed-column hint. Selects the bit-packing layout the
362/// codec uses internally — the right choice can swing compression ratio
363/// from ~1.2× (`Char`, treating numeric data as opaque bytes) to >3.5×
364/// (`Uint32` on a sorted u32 posting list). Exposed publicly so callers
365/// can target their column shape without going through `default_general`.
366#[cfg(feature = "nvcomp-gpu")]
367pub use crate::ferro_compress::BitcompDataType;
368
369#[cfg(not(feature = "nvcomp-gpu"))]
370pub fn is_gpu_available() -> bool {
371    false
372}
373
374#[cfg(all(test, feature = "nvcomp-gpu"))]
375mod tests {
376    use super::*;
377    use crate::Codec;
378    use bytes::Bytes;
379
380    #[tokio::test]
381    #[ignore = "requires CUDA-capable GPU + NVCOMP_HOME at build time"]
382    async fn nvcomp_zstd_roundtrip() {
383        if !is_gpu_available() {
384            eprintln!("skipping: no CUDA GPU detected at runtime");
385            return;
386        }
387        let codec = NvcompZstdCodec::new().expect("init");
388        let input = Bytes::from(vec![b'a'; 100_000]);
389        let (compressed, manifest) = codec.compress(input.clone()).await.expect("compress");
390        assert!(compressed.len() < input.len() / 10);
391        let decompressed = codec
392            .decompress(compressed, &manifest)
393            .await
394            .expect("decompress");
395        assert_eq!(decompressed, input);
396    }
397
398    #[tokio::test]
399    #[ignore = "requires CUDA-capable GPU + NVCOMP_HOME at build time"]
400    async fn nvcomp_bitcomp_roundtrip_on_integer_column() {
401        if !is_gpu_available() {
402            eprintln!("skipping: no CUDA GPU detected at runtime");
403            return;
404        }
405        let codec = NvcompBitcompCodec::default_general().expect("init");
406        // Parquet 風の単調増加 i64 列 (8 KB 分 = 1024 elements)
407        let mut payload: Vec<u8> = Vec::with_capacity(8192);
408        for i in 0i64..1024 {
409            payload.extend_from_slice(&i.to_le_bytes());
410        }
411        let input = Bytes::from(payload);
412        let (compressed, manifest) = codec.compress(input.clone()).await.expect("compress");
413        // Bitcomp は単調整数で 3.6-7.5x 圧縮を期待 (Phase 0 実測値)
414        assert!(
415            compressed.len() < input.len() / 2,
416            "bitcomp on monotone i64 should compress >2x, got {} -> {}",
417            input.len(),
418            compressed.len()
419        );
420        let decompressed = codec
421            .decompress(compressed, &manifest)
422            .await
423            .expect("decompress");
424        assert_eq!(decompressed, input);
425    }
426}
427
428/// v0.8.5 #83 H-3 unit tests for the manifest pre-allocation validator.
429/// These run without `nvcomp-gpu` because they exercise the
430/// pure-host validation path — no CUDA runtime, no large allocations.
431/// Keeps the safety guard exercised on every `cargo test -p s4-codec`
432/// invocation, even on machines that can't build the full nvCOMP backend.
433#[cfg(test)]
434mod manifest_validate_tests {
435    use super::{MAX_DECOMPRESSED_BYTES, validate_decompress_manifest};
436    use crate::{ChunkManifest, CodecError, CodecKind};
437
438    fn manifest(original: u64, compressed: u64) -> ChunkManifest {
439        ChunkManifest {
440            codec: CodecKind::NvcompZstd,
441            original_size: original,
442            compressed_size: compressed,
443            crc32c: 0,
444        }
445    }
446
447    #[test]
448    fn decompress_rejects_manifest_original_size_over_limit() {
449        // Forged manifest claiming 6 GiB of decompressed output —
450        // would otherwise have allocated `Vec::with_capacity(6 GiB)`
451        // and tripped an OOM before any CRC check.
452        let m = manifest(MAX_DECOMPRESSED_BYTES + 1, 1024);
453        let err = validate_decompress_manifest(&m, 1024).unwrap_err();
454        match err {
455            CodecError::ManifestSizeExceedsLimit { requested, limit } => {
456                assert_eq!(requested, MAX_DECOMPRESSED_BYTES + 1);
457                assert_eq!(limit, MAX_DECOMPRESSED_BYTES);
458            }
459            other => panic!("expected ManifestSizeExceedsLimit, got {other:?}"),
460        }
461    }
462
463    #[test]
464    fn decompress_rejects_manifest_compressed_size_mismatch() {
465        // Forged manifest whose compressed_size disagrees with the
466        // actual payload length — fails fast pre-allocation so a
467        // truncated / padded payload cannot drive a sized read.
468        let m = manifest(1024, 2048);
469        let err = validate_decompress_manifest(&m, 1024).unwrap_err();
470        match err {
471            CodecError::ManifestSizeMismatch {
472                manifest: m_size,
473                actual,
474            } => {
475                assert_eq!(m_size, 2048);
476                assert_eq!(actual, 1024);
477            }
478            other => panic!("expected ManifestSizeMismatch, got {other:?}"),
479        }
480    }
481
482    #[test]
483    fn decompress_validates_well_formed_manifest() {
484        // Sanity: a manifest whose original_size is at the ceiling
485        // and whose compressed_size matches is accepted.
486        let m = manifest(MAX_DECOMPRESSED_BYTES, 1024);
487        let n = validate_decompress_manifest(&m, 1024)
488            .expect("well-formed manifest at the ceiling must pass");
489        assert_eq!(n as u64, MAX_DECOMPRESSED_BYTES);
490    }
491
492    /// v0.8.5 #83 H-3: on a 32-bit target a `u64 → usize` cast can
493    /// truncate a multi-GiB manifest down to a few hundred MiB,
494    /// silently under-allocating the destination buffer. The explicit
495    /// `usize::try_from` arm in `validate_decompress_manifest` closes
496    /// that bug. We can't flip the host pointer width, but we can
497    /// assert the limit-check arm catches anything the try_from would
498    /// have rejected on either pointer width — on 32-bit
499    /// `MAX_DECOMPRESSED_BYTES` already exceeds `usize::MAX`, so the
500    /// limit arm catches it first; on 64-bit the limit guards the
501    /// same value-space below `usize::MAX`. Either way the
502    /// alloc-before-validate / silent-truncation bug class stays
503    /// closed.
504    #[cfg(target_pointer_width = "32")]
505    #[test]
506    fn decompress_rejects_u64_to_usize_overflow_on_32bit_targets() {
507        // 5 GiB > u32::MAX (~4 GiB), so the limit check fires first.
508        let m = manifest(MAX_DECOMPRESSED_BYTES, 1024);
509        let err = validate_decompress_manifest(&m, 1024).unwrap_err();
510        assert!(matches!(err, CodecError::ManifestSizeExceedsLimit { .. }));
511    }
512
513    /// 64-bit dual: validate that the entire safe range narrows
514    /// cleanly under `usize::try_from`. The 32-bit gating arm above
515    /// carries the forge-the-truncation contract on platforms where
516    /// it matters; on 64-bit the conversion can't fail for any value
517    /// within the accepted range.
518    #[cfg(target_pointer_width = "64")]
519    #[test]
520    fn decompress_rejects_u64_to_usize_overflow_on_32bit_targets() {
521        let m = manifest(MAX_DECOMPRESSED_BYTES, 16);
522        let n = validate_decompress_manifest(&m, 16).expect("limit value narrows on 64-bit");
523        assert_eq!(n as u64, MAX_DECOMPRESSED_BYTES);
524    }
525}