s4-codec 0.8.22

S4 (Squished S3) — pluggable GPU/CPU compression codec layer (nvCOMP zstd / Bitcomp, CPU zstd).
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
//! Multipart upload で使う on-the-wire フレーム形式。
//!
//! ## 課題
//!
//! AWS S3 multipart upload は各 part を独立にアップロードし、CompleteMultipartUpload
//! で順番に concat した bytes が最終 object になる。S4 が per-part で圧縮すると、
//! 最終 object は **N 個の圧縮済 chunk の concat**。GET 時に「どこからどこまでが
//! 1 chunk か」を知るためのメタが必要だが、object metadata には全 chunk の境界を
//! 入れる容量がない (S3 metadata 上限 2 KB、1000 parts × 8 byte = 8 KB で溢れる)。
//!
//! ## 解決策: in-band frame header
//!
//! 各 part bytes の先頭に固定 24 byte のフレームヘッダを置き、続く `compressed_size`
//! バイトが圧縮済 payload。GET は object 全体を読み込み、先頭から frame を順に
//! parse し各 chunk を解凍 → 連結する。
//!
//! ```text
//! ┌──────────────────────────── 24 bytes ────────────────────────────┐
//! │ magic    │ orig_size │ compressed_size │ crc32c │   ── then payload ──
//! │ "S4F1"   │  u64 LE   │     u64 LE      │ u32 LE │ [compressed_size bytes]
//! └──────────┴───────────┴─────────────────┴────────┘
//! ```
//!
//! - codec は object metadata の `s4-codec` で **全 part 共通** (CreateMultipartUpload
//!   で固定)。Phase 2 で per-frame codec 化を検討可。
//! - object metadata に `s4-multipart=true` を立てておき、GET 側はそれを見て frame
//!   parse を有効化する。
//!
//! ## 制限事項 (Phase 1)
//!
//! - **Range GET 非対応**: chunk 境界と byte offset の対応を計算しないので、
//!   client が Range を指定しても無視 (もしくは下流の Range を尊重して invalid
//!   解凍になる) — 実装上は Range を S4 で reject する方が安全。Phase 2 で対応。
//! - **per-part 別 codec 非対応**: 上記 frame format に codec ID を入れるか、
//!   object metadata を per-part に拡張するかの判断は Phase 2 で。

use bytes::{Buf, BufMut, Bytes, BytesMut};
use thiserror::Error;

use crate::CodecKind;

/// Frame magic = ASCII "S4F2" (S4 Frame, version 2)。
/// v1 (S4F1) との違い: 4 byte の codec_id field を header に追加し、per-frame
/// codec dispatch を可能にした。`s4-codec` v0.0.x は v1 を読まない (released 前
/// なので backward compat 不要)。
pub const FRAME_MAGIC: &[u8; 4] = b"S4F2";
/// Padding frame magic = ASCII "S4P1" (S4 Padding, version 1)。
///
/// AWS S3 は multipart の non-final part に min 5 MB 制約を課すが、S4 が圧縮すると
/// part が 5 MB を下回ることが多発する (圧縮率 10-100x で 5 MB が 50 KB-500 KB)。
/// その場合 `write_padded_frame` が compressed payload の後ろに `[S4P1][len:u64]
/// [len bytes of zeros]` を書き込んで全体を S3 の最小サイズまで膨らませる。
/// `FrameIter` は padding を skip するので decode 側は意識不要。
pub const PADDING_MAGIC: &[u8; 4] = b"S4P1";
/// 4 (magic) + 4 (codec_id) + 8 (orig_size) + 8 (compressed_size) + 4 (crc32c) = 28
pub const FRAME_HEADER_BYTES: usize = 4 + 4 + 8 + 8 + 4;
pub const PADDING_HEADER_BYTES: usize = 4 + 8; // = 12

/// AWS S3 の non-final multipart part の最小サイズ (5 MiB)。
pub const S3_MULTIPART_MIN_PART_BYTES: usize = 5 * 1024 * 1024;

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct FrameHeader {
    pub codec: CodecKind,
    pub original_size: u64,
    pub compressed_size: u64,
    pub crc32c: u32,
}

#[derive(Debug, Error)]
pub enum FrameError {
    #[error("frame too short: need at least {FRAME_HEADER_BYTES} bytes, have {0}")]
    TooShort(usize),
    #[error("bad frame magic: expected {expected:?}, got {got:?}")]
    BadMagic { expected: [u8; 4], got: [u8; 4] },
    #[error("frame compressed_size {compressed_size} exceeds remaining buffer {remaining}")]
    PayloadTruncated {
        compressed_size: u64,
        remaining: usize,
    },
    #[error("unknown codec id {0} in frame header (decoder out of date?)")]
    UnknownCodec(u32),
    /// v0.8.15 H-b: a frame's declared `compressed_size` (or padding
    /// frame length) exceeds the target architecture's
    /// `usize::MAX`. On 64-bit hosts this is unreachable; on the
    /// 32-bit `wasm32-unknown-unknown` target the `as usize` cast
    /// used to truncate, letting a forged 4 GiB+ frame parse as a
    /// 64-byte payload (silent data loss in the browser decoder).
    /// `try_from`-based validation forces the typed error instead.
    #[error("frame payload size {0} exceeds usize on this target")]
    PayloadTooLarge(u64),
}

/// 1 フレーム分を直列化: header + payload を `dst` に追記。
pub fn write_frame(dst: &mut BytesMut, header: FrameHeader, payload: &[u8]) {
    debug_assert_eq!(payload.len() as u64, header.compressed_size);
    dst.reserve(FRAME_HEADER_BYTES + payload.len());
    dst.put_slice(FRAME_MAGIC);
    dst.put_u32_le(header.codec.id());
    dst.put_u64_le(header.original_size);
    dst.put_u64_le(header.compressed_size);
    dst.put_u32_le(header.crc32c);
    dst.put_slice(payload);
}

/// `dst` の現在サイズが `min_total` byte を下回っていれば、padding frame を追記して
/// `min_total` byte 以上になるよう pad する。
///
/// # 厳密な事後条件 (v0.8.15 M-8 で明文化)
///
/// 呼び出し後 `dst.len()` は以下を満たす:
///
/// 1. `dst.len() >= min_total` (常に)
/// 2. `dst.len() <= max(min_total, prev_len + PADDING_HEADER_BYTES)` ── 1 frame
///    追記の上限。`need < PADDING_HEADER_BYTES` のケースでは padding header
///    自体が `min_total` を超える余地を作るため最大 11 byte の overshoot が
///    起こり得る。
/// 3. overshoot は最大 `PADDING_HEADER_BYTES - 1 = 11` byte。これは
///    multipart unit test `pad_to_minimum_no_excessive_overshoot`
///    (`< 5 MiB + 64`) で実証済。
///
/// padding 自体の中身は zero bytes (compress も decompress も無し)。
pub fn pad_to_minimum(dst: &mut BytesMut, min_total: usize) {
    if dst.len() >= min_total {
        return;
    }
    // 残り = min_total - 現在 ですが、padding 自体に PADDING_HEADER_BYTES 必要。
    let need = min_total - dst.len();
    let payload_len = need.saturating_sub(PADDING_HEADER_BYTES);
    // v0.8.15 M-8: `payload_len = 0` のケースでも PADDING_MAGIC + u64 length は必ず
    // 12 byte 書く必要があるが、`reserve(0)` 呼び出しは無駄でしかない (下の
    // `put_slice` / `put_u64_le` が必要分を確保する) ので、reserve は payload があると
    // きだけ行う。
    if payload_len > 0 {
        dst.reserve(PADDING_HEADER_BYTES + payload_len);
    }
    dst.put_slice(PADDING_MAGIC);
    dst.put_u64_le(payload_len as u64);
    // zero-fill。`put_bytes` で 1 回 syscall。
    dst.put_bytes(0, payload_len);
}

/// `input` の先頭から 1 フレーム読み出し、`(header, payload, remainder)` を返す。
pub fn read_frame(mut input: Bytes) -> Result<(FrameHeader, Bytes, Bytes), FrameError> {
    if input.len() < FRAME_HEADER_BYTES {
        return Err(FrameError::TooShort(input.len()));
    }
    let mut magic = [0u8; 4];
    magic.copy_from_slice(&input[..4]);
    if &magic != FRAME_MAGIC {
        return Err(FrameError::BadMagic {
            expected: *FRAME_MAGIC,
            got: magic,
        });
    }
    input.advance(4);
    let codec_id = input.get_u32_le();
    let codec = CodecKind::from_id(codec_id).ok_or(FrameError::UnknownCodec(codec_id))?;
    let original_size = input.get_u64_le();
    let compressed_size = input.get_u64_le();
    let crc32c = input.get_u32_le();
    // v0.8.15 H-b: `compressed_size as usize` used to silently
    // truncate on 32-bit targets (`s4-codec-wasm`), letting a 4 GiB+
    // forged frame decode as a 64-byte payload. `try_from` forces
    // the typed error so the WASM client surfaces the bad frame
    // instead of misreading silently.
    let compressed_size_usize = usize::try_from(compressed_size)
        .map_err(|_| FrameError::PayloadTooLarge(compressed_size))?;
    if compressed_size_usize > input.len() {
        return Err(FrameError::PayloadTruncated {
            compressed_size,
            remaining: input.len(),
        });
    }
    let payload = input.split_to(compressed_size_usize);
    Ok((
        FrameHeader {
            codec,
            original_size,
            compressed_size,
            crc32c,
        },
        payload,
        input,
    ))
}

/// `input` 全体を frame の sequence として parse、各 frame を yield する iterator。
///
/// `S4P1` (padding) を見つけたら header の length 分だけ skip して次に進む
/// (= caller には見せない)。
///
/// **エラー時の振る舞い**: parse 失敗を 1 度返したら、それ以降 next() は `None`
/// を返す (= iterator 終了)。これにより corrupt 入力に対する **無限ループ防止**
/// (proptest fuzz で発覚)。
pub struct FrameIter {
    rest: Bytes,
    fused: bool,
}

impl FrameIter {
    pub fn new(input: Bytes) -> Self {
        Self {
            rest: input,
            fused: false,
        }
    }
}

impl Iterator for FrameIter {
    type Item = Result<(FrameHeader, Bytes), FrameError>;
    fn next(&mut self) -> Option<Self::Item> {
        if self.fused {
            return None;
        }
        loop {
            if self.rest.is_empty() {
                return None;
            }
            if self.rest.len() < 4 {
                self.fused = true;
                return Some(Err(FrameError::TooShort(self.rest.len())));
            }
            let mut magic = [0u8; 4];
            magic.copy_from_slice(&self.rest[..4]);
            if &magic == PADDING_MAGIC {
                // skip padding frame: 4 magic + 8 len + len bytes
                if self.rest.len() < PADDING_HEADER_BYTES {
                    self.fused = true;
                    return Some(Err(FrameError::TooShort(self.rest.len())));
                }
                self.rest.advance(4);
                let pad_len = self.rest.get_u64_le();
                // v0.8.15 H-b: same `as usize` truncation hazard as
                // `read_frame` above. On 32-bit WASM a 4 GiB+
                // `pad_len` would skip 0 bytes silently.
                let pad_len_usize = match usize::try_from(pad_len) {
                    Ok(n) => n,
                    Err(_) => {
                        self.fused = true;
                        return Some(Err(FrameError::PayloadTooLarge(pad_len)));
                    }
                };
                if pad_len_usize > self.rest.len() {
                    self.fused = true;
                    return Some(Err(FrameError::PayloadTruncated {
                        compressed_size: pad_len,
                        remaining: self.rest.len(),
                    }));
                }
                self.rest.advance(pad_len_usize);
                continue;
            }
            // それ以外は data frame として parse
            return match read_frame(std::mem::take(&mut self.rest)) {
                Ok((hdr, payload, remainder)) => {
                    self.rest = remainder;
                    Some(Ok((hdr, payload)))
                }
                Err(e) => {
                    self.fused = true;
                    Some(Err(e))
                }
            };
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn frame_roundtrip_single() {
        let payload = Bytes::from_static(b"hello frame payload");
        let header = FrameHeader {
            codec: CodecKind::CpuZstd,
            original_size: 999,
            compressed_size: payload.len() as u64,
            crc32c: 0xdead_beef,
        };
        let mut buf = BytesMut::new();
        write_frame(&mut buf, header, &payload);
        assert_eq!(buf.len(), FRAME_HEADER_BYTES + payload.len());
        let bytes = buf.freeze();
        let (got_header, got_payload, rest) = read_frame(bytes).unwrap();
        assert_eq!(got_header, header);
        assert_eq!(got_payload, payload);
        assert!(rest.is_empty());
    }

    #[test]
    fn frame_iter_walks_all_frames_with_mixed_codecs() {
        // 異なる codec で 5 frame を交互に書く → reader が per-frame codec を返すこと
        let codecs = [
            CodecKind::Passthrough,
            CodecKind::CpuZstd,
            CodecKind::NvcompZstd,
            CodecKind::NvcompBitcomp,
            CodecKind::DietGpuAns,
        ];
        let mut buf = BytesMut::new();
        for (i, codec) in codecs.iter().enumerate() {
            let payload = vec![i as u8; (i + 1) * 4];
            let h = FrameHeader {
                codec: *codec,
                original_size: 100 + i as u64,
                compressed_size: payload.len() as u64,
                crc32c: i as u32,
            };
            write_frame(&mut buf, h, &payload);
        }
        let total = FrameIter::new(buf.freeze())
            .collect::<Result<Vec<_>, _>>()
            .unwrap();
        assert_eq!(total.len(), 5);
        for (i, (h, payload)) in total.iter().enumerate() {
            assert_eq!(h.codec, codecs[i], "codec must be preserved per frame");
            assert_eq!(h.original_size, 100 + i as u64);
            assert_eq!(h.crc32c, i as u32);
            assert_eq!(payload.len(), (i + 1) * 4);
        }
    }

    #[test]
    fn frame_bad_magic_rejected() {
        let mut buf = BytesMut::with_capacity(FRAME_HEADER_BYTES);
        buf.put_slice(b"BAD!");
        buf.put_u32_le(0); // codec_id
        buf.put_u64_le(0);
        buf.put_u64_le(0);
        buf.put_u32_le(0);
        let err = read_frame(buf.freeze()).unwrap_err();
        assert!(matches!(err, FrameError::BadMagic { .. }));
    }

    #[test]
    fn frame_truncated_rejected() {
        // header says 100 bytes payload, but we provide 0
        let mut buf = BytesMut::with_capacity(FRAME_HEADER_BYTES);
        buf.put_slice(FRAME_MAGIC);
        buf.put_u32_le(CodecKind::CpuZstd.id());
        buf.put_u64_le(100);
        buf.put_u64_le(100);
        buf.put_u32_le(0);
        let err = read_frame(buf.freeze()).unwrap_err();
        assert!(matches!(err, FrameError::PayloadTruncated { .. }));
    }

    #[test]
    fn frame_unknown_codec_rejected() {
        let mut buf = BytesMut::with_capacity(FRAME_HEADER_BYTES);
        buf.put_slice(FRAME_MAGIC);
        buf.put_u32_le(99); // unknown codec id
        buf.put_u64_le(0);
        buf.put_u64_le(0);
        buf.put_u32_le(0);
        let err = read_frame(buf.freeze()).unwrap_err();
        assert!(matches!(err, FrameError::UnknownCodec(99)));
    }

    #[test]
    fn frame_too_short_for_header_rejected() {
        let buf = Bytes::from_static(b"shortdata");
        let err = read_frame(buf).unwrap_err();
        assert!(matches!(err, FrameError::TooShort(_)));
    }

    #[test]
    fn padding_skipped_by_iter() {
        let mut buf = BytesMut::new();
        // frame 1: small data
        let p1 = Bytes::from_static(b"first frame");
        write_frame(
            &mut buf,
            FrameHeader {
                codec: CodecKind::CpuZstd,
                original_size: 11,
                compressed_size: p1.len() as u64,
                crc32c: 0,
            },
            &p1,
        );
        // pad to 1024 bytes (well above min)
        pad_to_minimum(&mut buf, 1024);
        assert!(buf.len() >= 1024);
        // frame 2: another small data
        let p2 = Bytes::from_static(b"second frame");
        write_frame(
            &mut buf,
            FrameHeader {
                codec: CodecKind::CpuZstd,
                original_size: 12,
                compressed_size: p2.len() as u64,
                crc32c: 0,
            },
            &p2,
        );

        let frames: Vec<_> = FrameIter::new(buf.freeze())
            .collect::<Result<_, _>>()
            .unwrap();
        assert_eq!(
            frames.len(),
            2,
            "padding must be skipped, only data yielded"
        );
        assert_eq!(frames[0].1, p1);
        assert_eq!(frames[1].1, p2);
    }

    #[test]
    fn pad_to_minimum_is_noop_when_already_above() {
        let mut buf = BytesMut::new();
        buf.extend_from_slice(&[0u8; 1024]);
        pad_to_minimum(&mut buf, 100);
        assert_eq!(buf.len(), 1024);
    }

    #[test]
    fn pad_to_minimum_grows_to_target() {
        let mut buf = BytesMut::new();
        write_frame(
            &mut buf,
            FrameHeader {
                codec: CodecKind::Passthrough,
                original_size: 0,
                compressed_size: 0,
                crc32c: 0,
            },
            &[],
        );
        let before = buf.len();
        pad_to_minimum(&mut buf, 5_000_000);
        assert!(buf.len() >= 5_000_000);
        assert!(buf.len() < 5_000_000 + 64, "no excessive overshoot");
        assert!(buf.len() > before);
    }
}