s4_codec/lib.rs
1//! S4 圧縮 codec layer。バックエンドを差し替え可能にする中立 trait を提供する。
2//!
3//! ## 採用 backend (2026-05 検討)
4//!
5//! - **nvCOMP** (NVIDIA proprietary、要 license 確認): Bitcomp / gANS / zstd-GPU
6//! - **DietGPU** (Meta, MIT): ANS-only、license clean な fallback
7//! - **CPU zstd**: GPU 無し環境向け究極の fallback / test bed
8
9use std::str::FromStr;
10
11use bytes::Bytes;
12use serde::{Deserialize, Serialize};
13use thiserror::Error;
14
15pub mod cpu_gzip;
16pub mod cpu_zstd;
17pub mod dietgpu;
18pub mod dispatcher;
19#[cfg(feature = "nvcomp-gpu")]
20mod ferro_compress;
21/// v0.8 #51: GPU-accelerated CSV column scan for the S3 Select WHERE
22/// evaluator. Feature-gated on `nvcomp-gpu` so the default build doesn't
23/// need cudarc / a CUDA driver. The s4-server `select` module calls into
24/// this only when the parsed query shape matches the supported subset
25/// (single-column equality / inequality / GT / LT / LIKE-prefix), and
26/// otherwise falls back to the existing CPU evaluator transparently.
27#[cfg(feature = "nvcomp-gpu")]
28pub mod gpu_select;
29pub mod index;
30pub mod multipart;
31pub mod nvcomp;
32pub mod passthrough;
33pub mod registry;
34
35pub use registry::CodecRegistry;
36
37/// 圧縮 codec の種類 (manifest に記録、後段の decompress で codec を確定するために使う)
38///
39/// v1.0 stability: `#[non_exhaustive]` — future codecs (e.g. LZ4, Brotli)
40/// may be added in minor releases without bumping major. Downstream
41/// callers must include a `_ =>` arm when matching on this enum.
42#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
43#[serde(rename_all = "kebab-case")]
44#[non_exhaustive]
45pub enum CodecKind {
46 Passthrough,
47 NvcompBitcomp,
48 NvcompGans,
49 NvcompZstd,
50 DietGpuAns,
51 CpuZstd,
52 /// nvCOMP GDeflate (v0.2 #9). DEFLATE-family GPU codec; output bytes are
53 /// NOT gzip-compatible at the wire level (different framing) but the
54 /// algorithm-level format aligns with stock DEFLATE/zlib decoders given
55 /// the right wrapper.
56 NvcompGDeflate,
57 /// CPU gzip via `flate2` (v0.4 #26). Produces RFC 1952 gzip output that
58 /// any standard `gunzip`-aware client can decode without knowing about
59 /// S4. Pair with the `Content-Encoding: gzip` header to serve to a
60 /// browser / curl that's never heard of S4.
61 CpuGzip,
62}
63
64impl CodecKind {
65 pub fn as_str(self) -> &'static str {
66 match self {
67 Self::Passthrough => "passthrough",
68 Self::NvcompBitcomp => "nvcomp-bitcomp",
69 Self::NvcompGans => "nvcomp-gans",
70 Self::NvcompZstd => "nvcomp-zstd",
71 Self::DietGpuAns => "dietgpu-ans",
72 Self::CpuZstd => "cpu-zstd",
73 Self::NvcompGDeflate => "nvcomp-gdeflate",
74 Self::CpuGzip => "cpu-gzip",
75 }
76 }
77
78 /// 安定 numeric ID。`s4-codec/multipart.rs` の frame header に書き込む際に使う。
79 /// ⚠️ **この値は wire format の一部** — 既存値の変更禁止 (新 codec は新 ID を割当)。
80 pub fn id(self) -> u32 {
81 match self {
82 Self::Passthrough => 0,
83 Self::CpuZstd => 1,
84 Self::NvcompZstd => 2,
85 Self::NvcompBitcomp => 3,
86 Self::NvcompGans => 4,
87 Self::DietGpuAns => 5,
88 Self::NvcompGDeflate => 6,
89 Self::CpuGzip => 7,
90 }
91 }
92
93 pub fn from_id(id: u32) -> Option<Self> {
94 Some(match id {
95 0 => Self::Passthrough,
96 1 => Self::CpuZstd,
97 2 => Self::NvcompZstd,
98 3 => Self::NvcompBitcomp,
99 4 => Self::NvcompGans,
100 5 => Self::DietGpuAns,
101 6 => Self::NvcompGDeflate,
102 7 => Self::CpuGzip,
103 _ => return None,
104 })
105 }
106}
107
108#[derive(Debug, thiserror::Error)]
109#[error("unknown codec kind: {0}")]
110pub struct ParseCodecKindError(String);
111
112impl FromStr for CodecKind {
113 type Err = ParseCodecKindError;
114 fn from_str(s: &str) -> Result<Self, Self::Err> {
115 Ok(match s {
116 "passthrough" => Self::Passthrough,
117 "nvcomp-bitcomp" => Self::NvcompBitcomp,
118 "nvcomp-gans" => Self::NvcompGans,
119 "nvcomp-zstd" => Self::NvcompZstd,
120 "dietgpu-ans" => Self::DietGpuAns,
121 "cpu-zstd" => Self::CpuZstd,
122 "nvcomp-gdeflate" => Self::NvcompGDeflate,
123 "cpu-gzip" => Self::CpuGzip,
124 other => return Err(ParseCodecKindError(other.into())),
125 })
126 }
127}
128
129/// 圧縮済 chunk のメタ情報。S3 オブジェクトの metadata に格納される。
130#[derive(Debug, Clone, Serialize, Deserialize)]
131pub struct ChunkManifest {
132 pub codec: CodecKind,
133 pub original_size: u64,
134 pub compressed_size: u64,
135 pub crc32c: u32,
136}
137
138/// v0.8 #55: per-op telemetry returned by `CodecRegistry::compress_with_telemetry`
139/// / `decompress_with_telemetry`. Lets the s4-server caller stamp Prometheus
140/// metrics (`s4_gpu_compress_seconds`, `s4_gpu_throughput_bytes_per_sec`,
141/// `s4_gpu_oom_total`) without s4-codec needing a `metrics` dep itself —
142/// callback pattern keeps the codec dep tree slim.
143///
144/// Fields:
145/// - `codec`: stable codec kind name (`CodecKind::as_str()` —
146/// `"cpu-zstd"` / `"nvcomp-zstd"` / etc).
147/// - `bytes_in`: input length to the operation. For compress this is the
148/// uncompressed input; for decompress this is the compressed input.
149/// - `bytes_out`: output length. For compress = compressed; for decompress
150/// = decompressed.
151/// - `gpu_seconds`: `Some(elapsed_secs)` for GPU-backed codecs (Nvcomp*),
152/// `None` for CPU codecs (CpuZstd / Passthrough / CpuGzip). Callers
153/// skip the GPU metric stamp when this is `None`.
154/// - `oom`: `true` iff the operation failed with an OOM-classified error.
155/// The associated `Result` is still `Err(...)`; this flag exists so the
156/// stamp helper can tell OOM apart from generic backend errors without
157/// introspecting the `CodecError` chain at the call site.
158#[derive(Debug, Clone, Copy)]
159pub struct CompressTelemetry {
160 pub codec: &'static str,
161 pub bytes_in: u64,
162 pub bytes_out: u64,
163 pub gpu_seconds: Option<f64>,
164 pub oom: bool,
165}
166
167impl CompressTelemetry {
168 /// CPU-codec convenience constructor — `gpu_seconds = None`,
169 /// `oom = false`. Used by passthrough / cpu-zstd / cpu-gzip path.
170 pub fn cpu(codec: &'static str, bytes_in: u64, bytes_out: u64) -> Self {
171 Self {
172 codec,
173 bytes_in,
174 bytes_out,
175 gpu_seconds: None,
176 oom: false,
177 }
178 }
179
180 /// GPU-codec convenience constructor — populates `gpu_seconds`
181 /// from the measured wall-clock duration of the inner compress /
182 /// decompress call.
183 pub fn gpu(codec: &'static str, bytes_in: u64, bytes_out: u64, seconds: f64) -> Self {
184 Self {
185 codec,
186 bytes_in,
187 bytes_out,
188 gpu_seconds: Some(seconds),
189 oom: false,
190 }
191 }
192
193 /// Mark this telemetry as the OOM-failure shape — paired with
194 /// `Err(CodecError::Backend(...))`. Callers stamp
195 /// `s4_gpu_oom_total{codec=...}` when this is `true`.
196 pub fn with_oom(mut self) -> Self {
197 self.oom = true;
198 self
199 }
200}
201
202/// v0.8 #55: heuristic OOM classifier. nvCOMP / cudarc surface OOM as a
203/// `CodecError::Backend(anyhow!("...out of memory..."))` (the underlying
204/// CUDA driver returns `CUDA_ERROR_OUT_OF_MEMORY` which `cudarc` /
205/// nvCOMP stringify); we substring-match for the well-known fragments
206/// so the metric stamp doesn't need to thread a typed error variant
207/// through the FFI boundary. Returns `true` only on a high-confidence
208/// match; non-OOM backend errors (CRC mismatch, IO error, etc.) yield
209/// `false` and are stamped as plain `s4_requests_total{result="err"}`
210/// without bumping the OOM counter.
211pub fn looks_like_oom(err: &CodecError) -> bool {
212 let s = err.to_string().to_ascii_lowercase();
213 s.contains("out of memory")
214 || s.contains("cudaerrormemoryallocation")
215 || s.contains("cuda_error_out_of_memory")
216}
217
218/// codec 操作のエラー型。`anyhow::Error` ではなく専用型にすることで、上位 (S4Service) が
219/// HTTP エラーコードを意味的に出し分けやすくする。
220///
221/// v1.0 stability: `#[non_exhaustive]` — new error variants (e.g. for
222/// future codecs or validation guards) may be added in minor releases.
223/// Downstream callers must include a `_ =>` arm when matching.
224#[derive(Debug, Error)]
225#[non_exhaustive]
226pub enum CodecError {
227 #[error("codec mismatch: expected {expected:?}, got {got:?}")]
228 CodecMismatch { expected: CodecKind, got: CodecKind },
229
230 #[error("crc32c mismatch (chunk corruption?): expected {expected:#010x}, got {got:#010x}")]
231 CrcMismatch { expected: u32, got: u32 },
232
233 #[error("compressed size mismatch: manifest says {expected} bytes, payload is {got} bytes")]
234 SizeMismatch { expected: u64, got: u64 },
235
236 #[error("compression backend error: {0}")]
237 Backend(#[from] anyhow::Error),
238
239 #[error("io error: {0}")]
240 Io(#[from] std::io::Error),
241
242 #[error("blocking-task join error: {0}")]
243 Join(#[from] tokio::task::JoinError),
244
245 #[error("codec {0:?} is not registered in this CodecRegistry")]
246 UnregisteredCodec(CodecKind),
247
248 /// v0.8.4 #73 M2: streaming compress consumed fewer input bytes than the
249 /// caller advertised (typically a client disconnect mid-PUT). Surfaced
250 /// from `streaming::streaming_compress_to_frames` when its
251 /// `expected_size = Some(n)` argument is supplied; the s4-server PUT
252 /// handler maps this to a 400 BadRequest so the client cannot rely on
253 /// silent success of a half-uploaded body.
254 #[error("streaming compress truncated: expected {expected} input bytes, got {got}")]
255 TruncatedStream { expected: u64, got: u64 },
256
257 /// v0.8.15 M-4: the client advertised a `Content-Length` of `expected`
258 /// bytes but kept feeding the gateway data past that point. AWS S3
259 /// returns `IncompleteBody` / `RequestBodyLengthMismatch` for the
260 /// same shape (under-length is `TruncatedStream`, over-length is
261 /// this variant). The s4-server PUT handler maps both to 400 so a
262 /// client retry can succeed instead of silently storing the
263 /// truncated-at-the-listener body.
264 #[error("streaming compress over-length: expected {expected} input bytes, got at least {got}")]
265 OverlengthStream { expected: u64, got: u64 },
266
267 /// v0.8.5 #83 H-3: nvCOMP decompress refused to honour a manifest whose
268 /// `original_size` exceeds the safety ceiling (default 5 GiB — AWS S3
269 /// single-PUT max). Without this gate, a forged or corrupted manifest
270 /// can drive a `Vec::with_capacity(huge)` and trip an OOM before the
271 /// CRC check ever runs. Distinct from `SizeMismatch` because here the
272 /// manifest itself is rejected pre-allocation rather than a
273 /// post-decompress length comparison.
274 #[error(
275 "manifest original_size {requested} exceeds safety limit {limit} \
276 (forged / corrupted manifest?)"
277 )]
278 ManifestSizeExceedsLimit { requested: u64, limit: u64 },
279
280 /// v0.8.5 #83 H-3: nvCOMP decompress saw a manifest whose
281 /// `compressed_size` field disagrees with the actual input payload
282 /// length. Surfaced before allocation so a forged header can't drive
283 /// a sized read against truncated or padded input. Distinct from
284 /// `SizeMismatch` (which is the post-decompress original-size check):
285 /// this is a pre-flight check on the *compressed* side.
286 #[error("manifest compressed_size {manifest} does not match payload length {actual}")]
287 ManifestSizeMismatch { manifest: u64, actual: u64 },
288}
289
290/// v0.8.6 #89: maximum decompressed payload size honoured at decompress
291/// entry by every codec. Manifests claiming a larger `original_size` are
292/// rejected pre-allocation as forged / corrupted, so a malicious manifest
293/// cannot drive `Vec::with_capacity(huge)` into an OOM (memory-DoS)
294/// before the CRC check ever runs.
295///
296/// Was `nvcomp::MAX_DECOMPRESSED_BYTES` (v0.8.5 #83), promoted to
297/// `s4_codec::MAX_DECOMPRESSED_BYTES` so CPU codecs (CpuZstd / CpuGzip)
298/// share the exact same ceiling — the continuous fuzz farm hit OOM in
299/// `cpu_zstd_decompress_bolero` (issue #89) within minutes because the
300/// CPU codecs were doing `Vec::with_capacity(manifest.original_size)`
301/// before this guard had been promoted out of the GPU-only module.
302///
303/// Rationale for 5 GiB: matches AWS S3's documented single-PUT object
304/// ceiling (`PUT Object` is capped at 5 GiB; bigger payloads must use
305/// multipart upload, which is split into ≤5 GiB parts). Real S4 chunks
306/// are bounded by the same ceiling end-to-end, so a manifest whose
307/// `original_size` exceeds it cannot have come from a well-formed S4 PUT.
308pub const MAX_DECOMPRESSED_BYTES: u64 = 5 * 1024 * 1024 * 1024;
309
310/// v0.8.6 #89: bootstrap capacity for the decompressed-output `Vec` so
311/// the `Vec::with_capacity(original_size)` pre-allocation can no longer
312/// be driven into RSS-OOM by a forged manifest. Small enough (1 MiB)
313/// that even an attacker claiming `original_size = u32::MAX` only
314/// reserves 1 MiB up front; `read_to_end` grows the buffer as actual
315/// decompressed bytes arrive (capped at `manifest.original_size + 1024`
316/// by the existing decompression-bomb guard).
317///
318/// Why not `Vec::new()` (= 0 capacity)? `read_to_end` would grow the
319/// buffer via doubling, producing ~20 reallocations + memcpys for a
320/// typical 1 MiB chunk. 1 MiB pre-alloc skips those for the common
321/// small-chunk case while keeping the worst-case adversarial alloc
322/// flat at 1 MiB.
323pub const DECOMPRESS_BOOTSTRAP_CAPACITY: usize = 1 << 20; // 1 MiB
324
325/// v0.8.6 #89: shared pre-allocation manifest validator invoked by every
326/// decompress path (CpuZstd / CpuGzip / nvCOMP Zstd / Bitcomp /
327/// GDeflate). Centralising the check keeps every decompress site using
328/// identical limits and error shapes, so one missed update can't
329/// reintroduce the alloc-before-validate bug. Returns the
330/// `usize`-narrowed `original_size` ready for `Vec::with_capacity`, or a
331/// typed `CodecError` the caller propagates verbatim.
332///
333/// Was `nvcomp::validate_decompress_manifest` (v0.8.5 #83). Promoted
334/// out of the `#[cfg(any(feature = "nvcomp-gpu", test))]` gate so CPU
335/// codecs can call it unconditionally.
336pub fn validate_decompress_manifest(
337 manifest: &ChunkManifest,
338 actual_compressed_len: usize,
339) -> Result<usize, CodecError> {
340 if manifest.original_size > MAX_DECOMPRESSED_BYTES {
341 return Err(CodecError::ManifestSizeExceedsLimit {
342 requested: manifest.original_size,
343 limit: MAX_DECOMPRESSED_BYTES,
344 });
345 }
346 if manifest.compressed_size != actual_compressed_len as u64 {
347 return Err(CodecError::ManifestSizeMismatch {
348 manifest: manifest.compressed_size,
349 actual: actual_compressed_len as u64,
350 });
351 }
352 usize::try_from(manifest.original_size).map_err(|_| CodecError::ManifestSizeExceedsLimit {
353 requested: manifest.original_size,
354 limit: usize::MAX as u64,
355 })
356}
357
358/// pluggable な圧縮 backend trait。
359///
360/// すべて async — GPU codec は CUDA stream に await でき、CPU codec は
361/// `spawn_blocking` で別スレッドへ逃がす。
362#[async_trait::async_trait]
363pub trait Codec: Send + Sync {
364 /// この実装が提供する codec の種類
365 fn kind(&self) -> CodecKind;
366
367 /// 圧縮: 入力 bytes → 圧縮済 bytes + manifest
368 async fn compress(&self, input: Bytes) -> Result<(Bytes, ChunkManifest), CodecError>;
369
370 /// 解凍: 圧縮済 bytes + manifest → 元の bytes
371 async fn decompress(&self, input: Bytes, manifest: &ChunkManifest)
372 -> Result<Bytes, CodecError>;
373}
374
375pub use dispatcher::CodecDispatcher;