Skip to main content

s4_codec/
nvcomp.rs

1//! nvCOMP (NVIDIA proprietary) backend ラッパー。
2//!
3//! ## 設計方針 (2026-05-12 確定)
4//!
5//! - **integrated ferro-compress 経由**: nvCOMP の Rust binding を s4-codec の内部
6//!   module `crate::ferro_compress` (Apache-2.0 OR MIT) として物理統合済。本 module は
7//!   それを async な [`crate::Codec`] trait に bridge する薄い adapter。
8//! - **feature gate**: `nvcomp-gpu` feature を opt-in にすることで、CUDA toolchain と
9//!   NVCOMP_HOME が無い環境でも default build (cargo check / test) が green に保たれる。
10//! - **配布形態**: nvCOMP redist は NVIDIA SLA 制約あり。Phase 1 は **BYO 方式**
11//!   (顧客が NGC からダウンロード) を default、AMI 同梱は NVIDIA 書面確認後に判断。
12//!
13//! ## 提供 codec
14//!
15//! - [`NvcompZstdCodec`]: nvCOMP zstd-GPU。汎用 text / log。
16//! - [`NvcompBitcompCodec`]: nvCOMP Bitcomp。整数列 (Parquet 数値列、time-series)。
17//!
18//! ## ビルド方法
19//!
20//! ```bash
21//! export NVCOMP_HOME=/path/to/nvcomp-linux-x86_64-5.x.x.x_cuda12-archive
22//! cargo build --features nvcomp-gpu
23//! cargo test --features nvcomp-gpu -- --ignored  # GPU 必須テスト
24//! ```
25
26#[cfg(feature = "nvcomp-gpu")]
27mod imp {
28    use std::sync::Arc;
29
30    use crate::ferro_compress::{Algo, BitcompDataType, Codec as FerroCodec, NvcompCodec};
31    use bytes::Bytes;
32
33    use crate::{ChunkManifest, Codec, CodecError, CodecKind};
34
35    /// nvCOMP zstd-GPU を S4 の `Codec` trait に bridge。
36    pub struct NvcompZstdCodec {
37        inner: Arc<NvcompCodec>,
38    }
39
40    impl NvcompZstdCodec {
41        pub fn new() -> Result<Self, CodecError> {
42            let inner = NvcompCodec::new(Algo::Zstd)
43                .map_err(|e| CodecError::Backend(anyhow::anyhow!("nvcomp zstd init: {e}")))?;
44            Ok(Self {
45                inner: Arc::new(inner),
46            })
47        }
48    }
49
50    #[async_trait::async_trait]
51    impl Codec for NvcompZstdCodec {
52        fn kind(&self) -> CodecKind {
53            CodecKind::NvcompZstd
54        }
55
56        async fn compress(&self, input: Bytes) -> Result<(Bytes, ChunkManifest), CodecError> {
57            let original_size = input.len() as u64;
58            let original_crc = crc32c::crc32c(&input);
59            let codec = Arc::clone(&self.inner);
60            let compressed = tokio::task::spawn_blocking(move || -> Result<Vec<u8>, CodecError> {
61                let mut out = Vec::with_capacity(codec.max_compressed_len(input.len()));
62                codec.compress(input.as_ref(), &mut out).map_err(|e| {
63                    CodecError::Backend(anyhow::anyhow!("nvcomp zstd compress: {e}"))
64                })?;
65                Ok(out)
66            })
67            .await??;
68            let manifest = ChunkManifest {
69                codec: CodecKind::NvcompZstd,
70                original_size,
71                compressed_size: compressed.len() as u64,
72                crc32c: original_crc,
73            };
74            Ok((Bytes::from(compressed), manifest))
75        }
76
77        async fn decompress(
78            &self,
79            input: Bytes,
80            manifest: &ChunkManifest,
81        ) -> Result<Bytes, CodecError> {
82            if manifest.codec != CodecKind::NvcompZstd {
83                return Err(CodecError::CodecMismatch {
84                    expected: CodecKind::NvcompZstd,
85                    got: manifest.codec,
86                });
87            }
88            let expected_crc = manifest.crc32c;
89            let expected_orig_size = manifest.original_size as usize;
90            let codec = Arc::clone(&self.inner);
91            let decompressed =
92                tokio::task::spawn_blocking(move || -> Result<Vec<u8>, CodecError> {
93                    let mut out = Vec::with_capacity(expected_orig_size);
94                    codec.decompress(input.as_ref(), &mut out).map_err(|e| {
95                        CodecError::Backend(anyhow::anyhow!("nvcomp zstd decompress: {e}"))
96                    })?;
97                    Ok(out)
98                })
99                .await??;
100            if decompressed.len() != expected_orig_size {
101                return Err(CodecError::SizeMismatch {
102                    expected: manifest.original_size,
103                    got: decompressed.len() as u64,
104                });
105            }
106            let actual_crc = crc32c::crc32c(&decompressed);
107            if actual_crc != expected_crc {
108                return Err(CodecError::CrcMismatch {
109                    expected: expected_crc,
110                    got: actual_crc,
111                });
112            }
113            Ok(Bytes::from(decompressed))
114        }
115    }
116
117    /// nvCOMP Bitcomp を S4 の `Codec` trait に bridge。整数列に最適化。
118    pub struct NvcompBitcompCodec {
119        inner: Arc<NvcompCodec>,
120    }
121
122    impl NvcompBitcompCodec {
123        /// `data_type` で nvCOMP の bit-packing / delta layout が決まる。
124        /// 整数列 / float 列で適切に使い分ける必要がある (Char 汎用は圧縮率が落ちる)。
125        pub fn new(data_type: BitcompDataType) -> Result<Self, CodecError> {
126            let inner = NvcompCodec::new(Algo::Bitcomp { data_type })
127                .map_err(|e| CodecError::Backend(anyhow::anyhow!("nvcomp bitcomp init: {e}")))?;
128            Ok(Self {
129                inner: Arc::new(inner),
130            })
131        }
132
133        /// デフォルト: data_type=Char (バイト列汎用)
134        pub fn default_general() -> Result<Self, CodecError> {
135            Self::new(BitcompDataType::Char)
136        }
137    }
138
139    #[async_trait::async_trait]
140    impl Codec for NvcompBitcompCodec {
141        fn kind(&self) -> CodecKind {
142            CodecKind::NvcompBitcomp
143        }
144
145        async fn compress(&self, input: Bytes) -> Result<(Bytes, ChunkManifest), CodecError> {
146            let original_size = input.len() as u64;
147            let original_crc = crc32c::crc32c(&input);
148            let codec = Arc::clone(&self.inner);
149            let compressed = tokio::task::spawn_blocking(move || -> Result<Vec<u8>, CodecError> {
150                let mut out = Vec::with_capacity(codec.max_compressed_len(input.len()));
151                codec.compress(input.as_ref(), &mut out).map_err(|e| {
152                    CodecError::Backend(anyhow::anyhow!("nvcomp bitcomp compress: {e}"))
153                })?;
154                Ok(out)
155            })
156            .await??;
157            let manifest = ChunkManifest {
158                codec: CodecKind::NvcompBitcomp,
159                original_size,
160                compressed_size: compressed.len() as u64,
161                crc32c: original_crc,
162            };
163            Ok((Bytes::from(compressed), manifest))
164        }
165
166        async fn decompress(
167            &self,
168            input: Bytes,
169            manifest: &ChunkManifest,
170        ) -> Result<Bytes, CodecError> {
171            if manifest.codec != CodecKind::NvcompBitcomp {
172                return Err(CodecError::CodecMismatch {
173                    expected: CodecKind::NvcompBitcomp,
174                    got: manifest.codec,
175                });
176            }
177            let expected_crc = manifest.crc32c;
178            let expected_orig_size = manifest.original_size as usize;
179            let codec = Arc::clone(&self.inner);
180            let decompressed =
181                tokio::task::spawn_blocking(move || -> Result<Vec<u8>, CodecError> {
182                    let mut out = Vec::with_capacity(expected_orig_size);
183                    codec.decompress(input.as_ref(), &mut out).map_err(|e| {
184                        CodecError::Backend(anyhow::anyhow!("nvcomp bitcomp decompress: {e}"))
185                    })?;
186                    Ok(out)
187                })
188                .await??;
189            if decompressed.len() != expected_orig_size {
190                return Err(CodecError::SizeMismatch {
191                    expected: manifest.original_size,
192                    got: decompressed.len() as u64,
193                });
194            }
195            let actual_crc = crc32c::crc32c(&decompressed);
196            if actual_crc != expected_crc {
197                return Err(CodecError::CrcMismatch {
198                    expected: expected_crc,
199                    got: actual_crc,
200                });
201            }
202            Ok(Bytes::from(decompressed))
203        }
204    }
205
206    /// nvCOMP GDeflate を S4 の `Codec` trait に bridge (v0.2 #9)。
207    /// DEFLATE-family GPU codec。汎用 binary、log、JSON 等に zstd と並ぶ
208    /// 候補。zstd よりは圧縮率劣るが、algorithm-level format が DEFLATE
209    /// 互換なので将来 wrapper を被せれば stock gunzip でも復号可能 (Phase 2)。
210    pub struct NvcompGDeflateCodec {
211        inner: Arc<NvcompCodec>,
212    }
213
214    impl NvcompGDeflateCodec {
215        pub fn new() -> Result<Self, CodecError> {
216            let inner = NvcompCodec::new(Algo::GDeflate)
217                .map_err(|e| CodecError::Backend(anyhow::anyhow!("nvcomp gdeflate init: {e}")))?;
218            Ok(Self {
219                inner: Arc::new(inner),
220            })
221        }
222    }
223
224    #[async_trait::async_trait]
225    impl Codec for NvcompGDeflateCodec {
226        fn kind(&self) -> CodecKind {
227            CodecKind::NvcompGDeflate
228        }
229
230        async fn compress(&self, input: Bytes) -> Result<(Bytes, ChunkManifest), CodecError> {
231            let original_size = input.len() as u64;
232            let original_crc = crc32c::crc32c(&input);
233            let codec = Arc::clone(&self.inner);
234            let compressed = tokio::task::spawn_blocking(move || -> Result<Vec<u8>, CodecError> {
235                let mut out = Vec::with_capacity(codec.max_compressed_len(input.len()));
236                codec.compress(input.as_ref(), &mut out).map_err(|e| {
237                    CodecError::Backend(anyhow::anyhow!("nvcomp gdeflate compress: {e}"))
238                })?;
239                Ok(out)
240            })
241            .await??;
242            let manifest = ChunkManifest {
243                codec: CodecKind::NvcompGDeflate,
244                original_size,
245                compressed_size: compressed.len() as u64,
246                crc32c: original_crc,
247            };
248            Ok((Bytes::from(compressed), manifest))
249        }
250
251        async fn decompress(
252            &self,
253            input: Bytes,
254            manifest: &ChunkManifest,
255        ) -> Result<Bytes, CodecError> {
256            if manifest.codec != CodecKind::NvcompGDeflate {
257                return Err(CodecError::CodecMismatch {
258                    expected: CodecKind::NvcompGDeflate,
259                    got: manifest.codec,
260                });
261            }
262            let expected_crc = manifest.crc32c;
263            let expected_orig_size = manifest.original_size as usize;
264            let codec = Arc::clone(&self.inner);
265            let decompressed =
266                tokio::task::spawn_blocking(move || -> Result<Vec<u8>, CodecError> {
267                    let mut out = Vec::with_capacity(expected_orig_size);
268                    codec.decompress(input.as_ref(), &mut out).map_err(|e| {
269                        CodecError::Backend(anyhow::anyhow!("nvcomp gdeflate decompress: {e}"))
270                    })?;
271                    Ok(out)
272                })
273                .await??;
274            if decompressed.len() != expected_orig_size {
275                return Err(CodecError::SizeMismatch {
276                    expected: manifest.original_size,
277                    got: decompressed.len() as u64,
278                });
279            }
280            let actual_crc = crc32c::crc32c(&decompressed);
281            if actual_crc != expected_crc {
282                return Err(CodecError::CrcMismatch {
283                    expected: expected_crc,
284                    got: actual_crc,
285                });
286            }
287            Ok(Bytes::from(decompressed))
288        }
289    }
290
291    /// CUDA-capable な GPU が runtime に存在するか
292    pub fn is_gpu_available() -> bool {
293        NvcompCodec::is_available()
294    }
295}
296
297#[cfg(feature = "nvcomp-gpu")]
298pub use imp::{NvcompBitcompCodec, NvcompGDeflateCodec, NvcompZstdCodec, is_gpu_available};
299
300/// nvCOMP Bitcomp typed-column hint. Selects the bit-packing layout the
301/// codec uses internally — the right choice can swing compression ratio
302/// from ~1.2× (`Char`, treating numeric data as opaque bytes) to >3.5×
303/// (`Uint32` on a sorted u32 posting list). Exposed publicly so callers
304/// can target their column shape without going through `default_general`.
305#[cfg(feature = "nvcomp-gpu")]
306pub use crate::ferro_compress::BitcompDataType;
307
308#[cfg(not(feature = "nvcomp-gpu"))]
309pub fn is_gpu_available() -> bool {
310    false
311}
312
313#[cfg(all(test, feature = "nvcomp-gpu"))]
314mod tests {
315    use super::*;
316    use crate::Codec;
317    use bytes::Bytes;
318
319    #[tokio::test]
320    #[ignore = "requires CUDA-capable GPU + NVCOMP_HOME at build time"]
321    async fn nvcomp_zstd_roundtrip() {
322        if !is_gpu_available() {
323            eprintln!("skipping: no CUDA GPU detected at runtime");
324            return;
325        }
326        let codec = NvcompZstdCodec::new().expect("init");
327        let input = Bytes::from(vec![b'a'; 100_000]);
328        let (compressed, manifest) = codec.compress(input.clone()).await.expect("compress");
329        assert!(compressed.len() < input.len() / 10);
330        let decompressed = codec
331            .decompress(compressed, &manifest)
332            .await
333            .expect("decompress");
334        assert_eq!(decompressed, input);
335    }
336
337    #[tokio::test]
338    #[ignore = "requires CUDA-capable GPU + NVCOMP_HOME at build time"]
339    async fn nvcomp_bitcomp_roundtrip_on_integer_column() {
340        if !is_gpu_available() {
341            eprintln!("skipping: no CUDA GPU detected at runtime");
342            return;
343        }
344        let codec = NvcompBitcompCodec::default_general().expect("init");
345        // Parquet 風の単調増加 i64 列 (8 KB 分 = 1024 elements)
346        let mut payload: Vec<u8> = Vec::with_capacity(8192);
347        for i in 0i64..1024 {
348            payload.extend_from_slice(&i.to_le_bytes());
349        }
350        let input = Bytes::from(payload);
351        let (compressed, manifest) = codec.compress(input.clone()).await.expect("compress");
352        // Bitcomp は単調整数で 3.6-7.5x 圧縮を期待 (Phase 0 実測値)
353        assert!(
354            compressed.len() < input.len() / 2,
355            "bitcomp on monotone i64 should compress >2x, got {} -> {}",
356            input.len(),
357            compressed.len()
358        );
359        let decompressed = codec
360            .decompress(compressed, &manifest)
361            .await
362            .expect("decompress");
363        assert_eq!(decompressed, input);
364    }
365}