Skip to main content

s4_codec/
lib.rs

1//! S4 圧縮 codec layer。バックエンドを差し替え可能にする中立 trait を提供する。
2//!
3//! ## 採用 backend (2026-05 検討)
4//!
5//! - **nvCOMP** (NVIDIA proprietary、要 license 確認): Bitcomp / gANS / zstd-GPU
6//! - **DietGPU** (Meta, MIT): ANS-only、license clean な fallback
7//! - **CPU zstd**: GPU 無し環境向け究極の fallback / test bed
8
9use std::str::FromStr;
10
11use bytes::Bytes;
12use serde::{Deserialize, Serialize};
13use thiserror::Error;
14
15pub mod cpu_gzip;
16pub mod cpu_zstd;
17pub mod dietgpu;
18pub mod dispatcher;
19#[cfg(feature = "nvcomp-gpu")]
20mod ferro_compress;
21/// v0.8 #51: GPU-accelerated CSV column scan for the S3 Select WHERE
22/// evaluator. Feature-gated on `nvcomp-gpu` so the default build doesn't
23/// need cudarc / a CUDA driver. The s4-server `select` module calls into
24/// this only when the parsed query shape matches the supported subset
25/// (single-column equality / inequality / GT / LT / LIKE-prefix), and
26/// otherwise falls back to the existing CPU evaluator transparently.
27#[cfg(feature = "nvcomp-gpu")]
28pub mod gpu_select;
29pub mod index;
30pub mod multipart;
31pub mod nvcomp;
32pub mod passthrough;
33pub mod registry;
34
35pub use registry::CodecRegistry;
36
37/// 圧縮 codec の種類 (manifest に記録、後段の decompress で codec を確定するために使う)
38#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
39#[serde(rename_all = "kebab-case")]
40pub enum CodecKind {
41    Passthrough,
42    NvcompBitcomp,
43    NvcompGans,
44    NvcompZstd,
45    DietGpuAns,
46    CpuZstd,
47    /// nvCOMP GDeflate (v0.2 #9). DEFLATE-family GPU codec; output bytes are
48    /// NOT gzip-compatible at the wire level (different framing) but the
49    /// algorithm-level format aligns with stock DEFLATE/zlib decoders given
50    /// the right wrapper.
51    NvcompGDeflate,
52    /// CPU gzip via `flate2` (v0.4 #26). Produces RFC 1952 gzip output that
53    /// any standard `gunzip`-aware client can decode without knowing about
54    /// S4. Pair with the `Content-Encoding: gzip` header to serve to a
55    /// browser / curl that's never heard of S4.
56    CpuGzip,
57}
58
59impl CodecKind {
60    pub fn as_str(self) -> &'static str {
61        match self {
62            Self::Passthrough => "passthrough",
63            Self::NvcompBitcomp => "nvcomp-bitcomp",
64            Self::NvcompGans => "nvcomp-gans",
65            Self::NvcompZstd => "nvcomp-zstd",
66            Self::DietGpuAns => "dietgpu-ans",
67            Self::CpuZstd => "cpu-zstd",
68            Self::NvcompGDeflate => "nvcomp-gdeflate",
69            Self::CpuGzip => "cpu-gzip",
70        }
71    }
72
73    /// 安定 numeric ID。`s4-codec/multipart.rs` の frame header に書き込む際に使う。
74    /// ⚠️ **この値は wire format の一部** — 既存値の変更禁止 (新 codec は新 ID を割当)。
75    pub fn id(self) -> u32 {
76        match self {
77            Self::Passthrough => 0,
78            Self::CpuZstd => 1,
79            Self::NvcompZstd => 2,
80            Self::NvcompBitcomp => 3,
81            Self::NvcompGans => 4,
82            Self::DietGpuAns => 5,
83            Self::NvcompGDeflate => 6,
84            Self::CpuGzip => 7,
85        }
86    }
87
88    pub fn from_id(id: u32) -> Option<Self> {
89        Some(match id {
90            0 => Self::Passthrough,
91            1 => Self::CpuZstd,
92            2 => Self::NvcompZstd,
93            3 => Self::NvcompBitcomp,
94            4 => Self::NvcompGans,
95            5 => Self::DietGpuAns,
96            6 => Self::NvcompGDeflate,
97            7 => Self::CpuGzip,
98            _ => return None,
99        })
100    }
101}
102
103#[derive(Debug, thiserror::Error)]
104#[error("unknown codec kind: {0}")]
105pub struct ParseCodecKindError(String);
106
107impl FromStr for CodecKind {
108    type Err = ParseCodecKindError;
109    fn from_str(s: &str) -> Result<Self, Self::Err> {
110        Ok(match s {
111            "passthrough" => Self::Passthrough,
112            "nvcomp-bitcomp" => Self::NvcompBitcomp,
113            "nvcomp-gans" => Self::NvcompGans,
114            "nvcomp-zstd" => Self::NvcompZstd,
115            "dietgpu-ans" => Self::DietGpuAns,
116            "cpu-zstd" => Self::CpuZstd,
117            "nvcomp-gdeflate" => Self::NvcompGDeflate,
118            "cpu-gzip" => Self::CpuGzip,
119            other => return Err(ParseCodecKindError(other.into())),
120        })
121    }
122}
123
124/// 圧縮済 chunk のメタ情報。S3 オブジェクトの metadata に格納される。
125#[derive(Debug, Clone, Serialize, Deserialize)]
126pub struct ChunkManifest {
127    pub codec: CodecKind,
128    pub original_size: u64,
129    pub compressed_size: u64,
130    pub crc32c: u32,
131}
132
133/// v0.8 #55: per-op telemetry returned by `CodecRegistry::compress_with_telemetry`
134/// / `decompress_with_telemetry`. Lets the s4-server caller stamp Prometheus
135/// metrics (`s4_gpu_compress_seconds`, `s4_gpu_throughput_bytes_per_sec`,
136/// `s4_gpu_oom_total`) without s4-codec needing a `metrics` dep itself —
137/// callback pattern keeps the codec dep tree slim.
138///
139/// Fields:
140/// - `codec`: stable codec kind name (`CodecKind::as_str()` —
141///   `"cpu-zstd"` / `"nvcomp-zstd"` / etc).
142/// - `bytes_in`: input length to the operation. For compress this is the
143///   uncompressed input; for decompress this is the compressed input.
144/// - `bytes_out`: output length. For compress = compressed; for decompress
145///   = decompressed.
146/// - `gpu_seconds`: `Some(elapsed_secs)` for GPU-backed codecs (Nvcomp*),
147///   `None` for CPU codecs (CpuZstd / Passthrough / CpuGzip). Callers
148///   skip the GPU metric stamp when this is `None`.
149/// - `oom`: `true` iff the operation failed with an OOM-classified error.
150///   The associated `Result` is still `Err(...)`; this flag exists so the
151///   stamp helper can tell OOM apart from generic backend errors without
152///   introspecting the `CodecError` chain at the call site.
153#[derive(Debug, Clone, Copy)]
154pub struct CompressTelemetry {
155    pub codec: &'static str,
156    pub bytes_in: u64,
157    pub bytes_out: u64,
158    pub gpu_seconds: Option<f64>,
159    pub oom: bool,
160}
161
162impl CompressTelemetry {
163    /// CPU-codec convenience constructor — `gpu_seconds = None`,
164    /// `oom = false`. Used by passthrough / cpu-zstd / cpu-gzip path.
165    pub fn cpu(codec: &'static str, bytes_in: u64, bytes_out: u64) -> Self {
166        Self {
167            codec,
168            bytes_in,
169            bytes_out,
170            gpu_seconds: None,
171            oom: false,
172        }
173    }
174
175    /// GPU-codec convenience constructor — populates `gpu_seconds`
176    /// from the measured wall-clock duration of the inner compress /
177    /// decompress call.
178    pub fn gpu(codec: &'static str, bytes_in: u64, bytes_out: u64, seconds: f64) -> Self {
179        Self {
180            codec,
181            bytes_in,
182            bytes_out,
183            gpu_seconds: Some(seconds),
184            oom: false,
185        }
186    }
187
188    /// Mark this telemetry as the OOM-failure shape — paired with
189    /// `Err(CodecError::Backend(...))`. Callers stamp
190    /// `s4_gpu_oom_total{codec=...}` when this is `true`.
191    pub fn with_oom(mut self) -> Self {
192        self.oom = true;
193        self
194    }
195}
196
197/// v0.8 #55: heuristic OOM classifier. nvCOMP / cudarc surface OOM as a
198/// `CodecError::Backend(anyhow!("...out of memory..."))` (the underlying
199/// CUDA driver returns `CUDA_ERROR_OUT_OF_MEMORY` which `cudarc` /
200/// nvCOMP stringify); we substring-match for the well-known fragments
201/// so the metric stamp doesn't need to thread a typed error variant
202/// through the FFI boundary. Returns `true` only on a high-confidence
203/// match; non-OOM backend errors (CRC mismatch, IO error, etc.) yield
204/// `false` and are stamped as plain `s4_requests_total{result="err"}`
205/// without bumping the OOM counter.
206pub fn looks_like_oom(err: &CodecError) -> bool {
207    let s = err.to_string().to_ascii_lowercase();
208    s.contains("out of memory")
209        || s.contains("cudaerrormemoryallocation")
210        || s.contains("cuda_error_out_of_memory")
211}
212
213/// codec 操作のエラー型。`anyhow::Error` ではなく専用型にすることで、上位 (S4Service) が
214/// HTTP エラーコードを意味的に出し分けやすくする。
215#[derive(Debug, Error)]
216pub enum CodecError {
217    #[error("codec mismatch: expected {expected:?}, got {got:?}")]
218    CodecMismatch { expected: CodecKind, got: CodecKind },
219
220    #[error("crc32c mismatch (chunk corruption?): expected {expected:#010x}, got {got:#010x}")]
221    CrcMismatch { expected: u32, got: u32 },
222
223    #[error("compressed size mismatch: manifest says {expected} bytes, payload is {got} bytes")]
224    SizeMismatch { expected: u64, got: u64 },
225
226    #[error("compression backend error: {0}")]
227    Backend(#[from] anyhow::Error),
228
229    #[error("io error: {0}")]
230    Io(#[from] std::io::Error),
231
232    #[error("blocking-task join error: {0}")]
233    Join(#[from] tokio::task::JoinError),
234
235    #[error("codec {0:?} is not registered in this CodecRegistry")]
236    UnregisteredCodec(CodecKind),
237}
238
239/// pluggable な圧縮 backend trait。
240///
241/// すべて async — GPU codec は CUDA stream に await でき、CPU codec は
242/// `spawn_blocking` で別スレッドへ逃がす。
243#[async_trait::async_trait]
244pub trait Codec: Send + Sync {
245    /// この実装が提供する codec の種類
246    fn kind(&self) -> CodecKind;
247
248    /// 圧縮: 入力 bytes → 圧縮済 bytes + manifest
249    async fn compress(&self, input: Bytes) -> Result<(Bytes, ChunkManifest), CodecError>;
250
251    /// 解凍: 圧縮済 bytes + manifest → 元の bytes
252    async fn decompress(&self, input: Bytes, manifest: &ChunkManifest)
253    -> Result<Bytes, CodecError>;
254}
255
256pub use dispatcher::CodecDispatcher;