s4-codec 0.2.0

S4 (Squished S3) — pluggable GPU/CPU compression codec layer (nvCOMP zstd / Bitcomp, CPU zstd).
Documentation
//! nvCOMP (NVIDIA proprietary) backend ラッパー。
//!
//! ## 設計方針 (2026-05-12 確定)
//!
//! - **integrated ferro-compress 経由**: nvCOMP の Rust binding を s4-codec の内部
//!   module `crate::ferro_compress` (Apache-2.0 OR MIT) として物理統合済。本 module は
//!   それを async な [`crate::Codec`] trait に bridge する薄い adapter。
//! - **feature gate**: `nvcomp-gpu` feature を opt-in にすることで、CUDA toolchain と
//!   NVCOMP_HOME が無い環境でも default build (cargo check / test) が green に保たれる。
//! - **配布形態**: nvCOMP redist は NVIDIA SLA 制約あり。Phase 1 は **BYO 方式**
//!   (顧客が NGC からダウンロード) を default、AMI 同梱は NVIDIA 書面確認後に判断。
//!
//! ## 提供 codec
//!
//! - [`NvcompZstdCodec`]: nvCOMP zstd-GPU。汎用 text / log。
//! - [`NvcompBitcompCodec`]: nvCOMP Bitcomp。整数列 (Parquet 数値列、time-series)。
//!
//! ## ビルド方法
//!
//! ```bash
//! export NVCOMP_HOME=/path/to/nvcomp-linux-x86_64-5.x.x.x_cuda12-archive
//! cargo build --features nvcomp-gpu
//! cargo test --features nvcomp-gpu -- --ignored  # GPU 必須テスト
//! ```

#[cfg(feature = "nvcomp-gpu")]
mod imp {
    use std::sync::Arc;

    use crate::ferro_compress::{Algo, BitcompDataType, Codec as FerroCodec, NvcompCodec};
    use bytes::Bytes;

    use crate::{ChunkManifest, Codec, CodecError, CodecKind};

    /// nvCOMP zstd-GPU を S4 の `Codec` trait に bridge。
    pub struct NvcompZstdCodec {
        inner: Arc<NvcompCodec>,
    }

    impl NvcompZstdCodec {
        pub fn new() -> Result<Self, CodecError> {
            let inner = NvcompCodec::new(Algo::Zstd)
                .map_err(|e| CodecError::Backend(anyhow::anyhow!("nvcomp zstd init: {e}")))?;
            Ok(Self {
                inner: Arc::new(inner),
            })
        }
    }

    #[async_trait::async_trait]
    impl Codec for NvcompZstdCodec {
        fn kind(&self) -> CodecKind {
            CodecKind::NvcompZstd
        }

        async fn compress(&self, input: Bytes) -> Result<(Bytes, ChunkManifest), CodecError> {
            let original_size = input.len() as u64;
            let original_crc = crc32c::crc32c(&input);
            let codec = Arc::clone(&self.inner);
            let compressed = tokio::task::spawn_blocking(move || -> Result<Vec<u8>, CodecError> {
                let mut out = Vec::with_capacity(codec.max_compressed_len(input.len()));
                codec.compress(input.as_ref(), &mut out).map_err(|e| {
                    CodecError::Backend(anyhow::anyhow!("nvcomp zstd compress: {e}"))
                })?;
                Ok(out)
            })
            .await??;
            let manifest = ChunkManifest {
                codec: CodecKind::NvcompZstd,
                original_size,
                compressed_size: compressed.len() as u64,
                crc32c: original_crc,
            };
            Ok((Bytes::from(compressed), manifest))
        }

        async fn decompress(
            &self,
            input: Bytes,
            manifest: &ChunkManifest,
        ) -> Result<Bytes, CodecError> {
            if manifest.codec != CodecKind::NvcompZstd {
                return Err(CodecError::CodecMismatch {
                    expected: CodecKind::NvcompZstd,
                    got: manifest.codec,
                });
            }
            let expected_crc = manifest.crc32c;
            let expected_orig_size = manifest.original_size as usize;
            let codec = Arc::clone(&self.inner);
            let decompressed =
                tokio::task::spawn_blocking(move || -> Result<Vec<u8>, CodecError> {
                    let mut out = Vec::with_capacity(expected_orig_size);
                    codec.decompress(input.as_ref(), &mut out).map_err(|e| {
                        CodecError::Backend(anyhow::anyhow!("nvcomp zstd decompress: {e}"))
                    })?;
                    Ok(out)
                })
                .await??;
            if decompressed.len() != expected_orig_size {
                return Err(CodecError::SizeMismatch {
                    expected: manifest.original_size,
                    got: decompressed.len() as u64,
                });
            }
            let actual_crc = crc32c::crc32c(&decompressed);
            if actual_crc != expected_crc {
                return Err(CodecError::CrcMismatch {
                    expected: expected_crc,
                    got: actual_crc,
                });
            }
            Ok(Bytes::from(decompressed))
        }
    }

    /// nvCOMP Bitcomp を S4 の `Codec` trait に bridge。整数列に最適化。
    pub struct NvcompBitcompCodec {
        inner: Arc<NvcompCodec>,
    }

    impl NvcompBitcompCodec {
        /// `data_type` で nvCOMP の bit-packing / delta layout が決まる。
        /// 整数列 / float 列で適切に使い分ける必要がある (Char 汎用は圧縮率が落ちる)。
        pub fn new(data_type: BitcompDataType) -> Result<Self, CodecError> {
            let inner = NvcompCodec::new(Algo::Bitcomp { data_type })
                .map_err(|e| CodecError::Backend(anyhow::anyhow!("nvcomp bitcomp init: {e}")))?;
            Ok(Self {
                inner: Arc::new(inner),
            })
        }

        /// デフォルト: data_type=Char (バイト列汎用)
        pub fn default_general() -> Result<Self, CodecError> {
            Self::new(BitcompDataType::Char)
        }
    }

    #[async_trait::async_trait]
    impl Codec for NvcompBitcompCodec {
        fn kind(&self) -> CodecKind {
            CodecKind::NvcompBitcomp
        }

        async fn compress(&self, input: Bytes) -> Result<(Bytes, ChunkManifest), CodecError> {
            let original_size = input.len() as u64;
            let original_crc = crc32c::crc32c(&input);
            let codec = Arc::clone(&self.inner);
            let compressed = tokio::task::spawn_blocking(move || -> Result<Vec<u8>, CodecError> {
                let mut out = Vec::with_capacity(codec.max_compressed_len(input.len()));
                codec.compress(input.as_ref(), &mut out).map_err(|e| {
                    CodecError::Backend(anyhow::anyhow!("nvcomp bitcomp compress: {e}"))
                })?;
                Ok(out)
            })
            .await??;
            let manifest = ChunkManifest {
                codec: CodecKind::NvcompBitcomp,
                original_size,
                compressed_size: compressed.len() as u64,
                crc32c: original_crc,
            };
            Ok((Bytes::from(compressed), manifest))
        }

        async fn decompress(
            &self,
            input: Bytes,
            manifest: &ChunkManifest,
        ) -> Result<Bytes, CodecError> {
            if manifest.codec != CodecKind::NvcompBitcomp {
                return Err(CodecError::CodecMismatch {
                    expected: CodecKind::NvcompBitcomp,
                    got: manifest.codec,
                });
            }
            let expected_crc = manifest.crc32c;
            let expected_orig_size = manifest.original_size as usize;
            let codec = Arc::clone(&self.inner);
            let decompressed =
                tokio::task::spawn_blocking(move || -> Result<Vec<u8>, CodecError> {
                    let mut out = Vec::with_capacity(expected_orig_size);
                    codec.decompress(input.as_ref(), &mut out).map_err(|e| {
                        CodecError::Backend(anyhow::anyhow!("nvcomp bitcomp decompress: {e}"))
                    })?;
                    Ok(out)
                })
                .await??;
            if decompressed.len() != expected_orig_size {
                return Err(CodecError::SizeMismatch {
                    expected: manifest.original_size,
                    got: decompressed.len() as u64,
                });
            }
            let actual_crc = crc32c::crc32c(&decompressed);
            if actual_crc != expected_crc {
                return Err(CodecError::CrcMismatch {
                    expected: expected_crc,
                    got: actual_crc,
                });
            }
            Ok(Bytes::from(decompressed))
        }
    }

    /// nvCOMP GDeflate を S4 の `Codec` trait に bridge (v0.2 #9)。
    /// DEFLATE-family GPU codec。汎用 binary、log、JSON 等に zstd と並ぶ
    /// 候補。zstd よりは圧縮率劣るが、algorithm-level format が DEFLATE
    /// 互換なので将来 wrapper を被せれば stock gunzip でも復号可能 (Phase 2)。
    pub struct NvcompGDeflateCodec {
        inner: Arc<NvcompCodec>,
    }

    impl NvcompGDeflateCodec {
        pub fn new() -> Result<Self, CodecError> {
            let inner = NvcompCodec::new(Algo::GDeflate)
                .map_err(|e| CodecError::Backend(anyhow::anyhow!("nvcomp gdeflate init: {e}")))?;
            Ok(Self {
                inner: Arc::new(inner),
            })
        }
    }

    #[async_trait::async_trait]
    impl Codec for NvcompGDeflateCodec {
        fn kind(&self) -> CodecKind {
            CodecKind::NvcompGDeflate
        }

        async fn compress(&self, input: Bytes) -> Result<(Bytes, ChunkManifest), CodecError> {
            let original_size = input.len() as u64;
            let original_crc = crc32c::crc32c(&input);
            let codec = Arc::clone(&self.inner);
            let compressed = tokio::task::spawn_blocking(move || -> Result<Vec<u8>, CodecError> {
                let mut out = Vec::with_capacity(codec.max_compressed_len(input.len()));
                codec.compress(input.as_ref(), &mut out).map_err(|e| {
                    CodecError::Backend(anyhow::anyhow!("nvcomp gdeflate compress: {e}"))
                })?;
                Ok(out)
            })
            .await??;
            let manifest = ChunkManifest {
                codec: CodecKind::NvcompGDeflate,
                original_size,
                compressed_size: compressed.len() as u64,
                crc32c: original_crc,
            };
            Ok((Bytes::from(compressed), manifest))
        }

        async fn decompress(
            &self,
            input: Bytes,
            manifest: &ChunkManifest,
        ) -> Result<Bytes, CodecError> {
            if manifest.codec != CodecKind::NvcompGDeflate {
                return Err(CodecError::CodecMismatch {
                    expected: CodecKind::NvcompGDeflate,
                    got: manifest.codec,
                });
            }
            let expected_crc = manifest.crc32c;
            let expected_orig_size = manifest.original_size as usize;
            let codec = Arc::clone(&self.inner);
            let decompressed =
                tokio::task::spawn_blocking(move || -> Result<Vec<u8>, CodecError> {
                    let mut out = Vec::with_capacity(expected_orig_size);
                    codec.decompress(input.as_ref(), &mut out).map_err(|e| {
                        CodecError::Backend(anyhow::anyhow!("nvcomp gdeflate decompress: {e}"))
                    })?;
                    Ok(out)
                })
                .await??;
            if decompressed.len() != expected_orig_size {
                return Err(CodecError::SizeMismatch {
                    expected: manifest.original_size,
                    got: decompressed.len() as u64,
                });
            }
            let actual_crc = crc32c::crc32c(&decompressed);
            if actual_crc != expected_crc {
                return Err(CodecError::CrcMismatch {
                    expected: expected_crc,
                    got: actual_crc,
                });
            }
            Ok(Bytes::from(decompressed))
        }
    }

    /// CUDA-capable な GPU が runtime に存在するか
    pub fn is_gpu_available() -> bool {
        NvcompCodec::is_available()
    }
}

#[cfg(feature = "nvcomp-gpu")]
pub use imp::{NvcompBitcompCodec, NvcompGDeflateCodec, NvcompZstdCodec, is_gpu_available};

#[cfg(not(feature = "nvcomp-gpu"))]
pub fn is_gpu_available() -> bool {
    false
}

#[cfg(all(test, feature = "nvcomp-gpu"))]
mod tests {
    use super::*;
    use crate::Codec;
    use bytes::Bytes;

    #[tokio::test]
    #[ignore = "requires CUDA-capable GPU + NVCOMP_HOME at build time"]
    async fn nvcomp_zstd_roundtrip() {
        if !is_gpu_available() {
            eprintln!("skipping: no CUDA GPU detected at runtime");
            return;
        }
        let codec = NvcompZstdCodec::new().expect("init");
        let input = Bytes::from(vec![b'a'; 100_000]);
        let (compressed, manifest) = codec.compress(input.clone()).await.expect("compress");
        assert!(compressed.len() < input.len() / 10);
        let decompressed = codec
            .decompress(compressed, &manifest)
            .await
            .expect("decompress");
        assert_eq!(decompressed, input);
    }

    #[tokio::test]
    #[ignore = "requires CUDA-capable GPU + NVCOMP_HOME at build time"]
    async fn nvcomp_bitcomp_roundtrip_on_integer_column() {
        if !is_gpu_available() {
            eprintln!("skipping: no CUDA GPU detected at runtime");
            return;
        }
        let codec = NvcompBitcompCodec::default_general().expect("init");
        // Parquet 風の単調増加 i64 列 (8 KB 分 = 1024 elements)
        let mut payload: Vec<u8> = Vec::with_capacity(8192);
        for i in 0i64..1024 {
            payload.extend_from_slice(&i.to_le_bytes());
        }
        let input = Bytes::from(payload);
        let (compressed, manifest) = codec.compress(input.clone()).await.expect("compress");
        // Bitcomp は単調整数で 3.6-7.5x 圧縮を期待 (Phase 0 実測値)
        assert!(
            compressed.len() < input.len() / 2,
            "bitcomp on monotone i64 should compress >2x, got {} -> {}",
            input.len(),
            compressed.len()
        );
        let decompressed = codec
            .decompress(compressed, &manifest)
            .await
            .expect("decompress");
        assert_eq!(decompressed, input);
    }
}