Skip to main content

s4_codec/
index.rs

1//! Frame index — Range GET の partial fetch を可能にするための sidecar object 形式。
2//!
3//! ## 課題
4//!
5//! S4-multipart object は `[S4F2 frame]([S4P1 padding][S4F2 frame])*` のシーケンス。
6//! Range GET (e.g. `bytes=N-M`) を効率的に処理するには、(a) どの frame が
7//! decompressed offset N..M に対応しているか、(b) その frame は object body の
8//! どこ (compressed_offset) から始まるか、を知る必要がある。
9//!
10//! ## 解決策
11//!
12//! `<key>.s4index` という sidecar object に下記の binary index を書く:
13//!
14//! ```text
15//! ┌──── 32 byte header ────┐
16//! │ S4IX magic (4)         │
17//! │ version u32 (4)        │
18//! │ total_frames u64 (8)   │
19//! │ total_original u64 (8) │
20//! │ total_padded u64 (8)   │  ← S3 上の object サイズ (padding 含む)
21//! └────────────────────────┘
22//! 各 frame について 32 byte:
23//!   original_offset  u64 LE
24//!   original_size    u64 LE
25//!   compressed_offset u64 LE  ← S3 object body における frame header の開始位置
26//!   compressed_size  u64 LE   ← header (28 byte) + payload の合計
27//! ```
28//!
29//! 1000 frame で 32 KB、10000 frame で 320 KB。10 万 frame でも 3.2 MB に収まる。
30//!
31//! ## 使い方
32//!
33//! - PUT: 1 frame の単純 index、PUT 完了後に sidecar 書込
34//! - CompleteMultipartUpload: object 全体を一度 fetch + scan して index を構築
35//! - Range GET: sidecar fetch → `lookup_range(start, end)` で frame 範囲 + S3 byte 範囲を取得
36//!   → backend に partial Range GET → frame parse → decompress → slice
37
38use bytes::{Buf, BufMut, Bytes, BytesMut};
39use thiserror::Error;
40
41pub const INDEX_MAGIC: &[u8; 4] = b"S4IX";
42pub const INDEX_VERSION: u32 = 1;
43pub const INDEX_HEADER_BYTES: usize = 4 + 4 + 8 + 8 + 4 + 4 + 8; // 40 (with padding)
44const HEADER_FIXED: usize = 4 + 4 + 8 + 8 + 8;
45pub const ENTRY_BYTES: usize = 8 + 8 + 8 + 8;
46
47#[derive(Debug, Clone, PartialEq, Eq)]
48pub struct FrameIndexEntry {
49    /// この frame が担当する decompressed byte 範囲の開始 (累計、0-based)
50    pub original_offset: u64,
51    /// 解凍後 byte 数 (frame header の original_size と同じ)
52    pub original_size: u64,
53    /// S3 object body 内での frame 開始位置 (S4F2 magic の先頭 byte)
54    pub compressed_offset: u64,
55    /// frame 全体のバイト数 (28 byte header + payload)
56    pub compressed_size: u64,
57}
58
59impl FrameIndexEntry {
60    pub fn original_end(&self) -> u64 {
61        self.original_offset + self.original_size
62    }
63    pub fn compressed_end(&self) -> u64 {
64        self.compressed_offset + self.compressed_size
65    }
66}
67
68#[derive(Debug, Clone, Default, PartialEq, Eq)]
69pub struct FrameIndex {
70    /// S3 上の object 全体サイズ (padding frame 含む)
71    pub total_padded_size: u64,
72    pub entries: Vec<FrameIndexEntry>,
73}
74
75impl FrameIndex {
76    pub fn total_original_size(&self) -> u64 {
77        self.entries.last().map(|e| e.original_end()).unwrap_or(0)
78    }
79
80    /// Range request `[start, end_exclusive)` を解決して必要 frame の (start_idx, end_idx_exclusive)
81    /// と S3 上の partial-fetch byte range `[byte_start, byte_end_exclusive)` を返す。
82    ///
83    /// 1 frame でもオーバーラップしていればその frame の **全 byte** を fetch する
84    /// (= 部分 frame は decompress 単位)。
85    pub fn lookup_range(&self, start: u64, end_exclusive: u64) -> Option<RangePlan> {
86        if self.entries.is_empty() || start >= end_exclusive {
87            return None;
88        }
89        let total = self.total_original_size();
90        if start >= total {
91            return None;
92        }
93        let clamped_end = end_exclusive.min(total);
94
95        // start を含む frame を二分探索 (entries は original_offset 昇順)
96        let first_idx = match self.entries.binary_search_by(|e| {
97            if e.original_end() <= start {
98                std::cmp::Ordering::Less
99            } else if e.original_offset > start {
100                std::cmp::Ordering::Greater
101            } else {
102                std::cmp::Ordering::Equal
103            }
104        }) {
105            Ok(i) => i,
106            Err(_) => return None,
107        };
108        // end を含む frame (end-1 を含むもの)
109        let last_inclusive = clamped_end - 1;
110        let last_idx = match self.entries.binary_search_by(|e| {
111            if e.original_end() <= last_inclusive {
112                std::cmp::Ordering::Less
113            } else if e.original_offset > last_inclusive {
114                std::cmp::Ordering::Greater
115            } else {
116                std::cmp::Ordering::Equal
117            }
118        }) {
119            Ok(i) => i,
120            Err(_) => return None,
121        };
122
123        let byte_start = self.entries[first_idx].compressed_offset;
124        let byte_end_exclusive = self.entries[last_idx].compressed_end();
125        Some(RangePlan {
126            first_frame_idx: first_idx,
127            last_frame_idx_inclusive: last_idx,
128            byte_start,
129            byte_end_exclusive,
130            // slice 開始 / 終了の original 内 offset
131            slice_start_in_combined: start - self.entries[first_idx].original_offset,
132            slice_end_in_combined: clamped_end - self.entries[first_idx].original_offset,
133        })
134    }
135}
136
137/// `lookup_range` の結果。`byte_start..byte_end_exclusive` を S3 から fetch、
138/// 該当 frames を decompress し、結果バイト列を `[slice_start_in_combined,
139/// slice_end_in_combined)` で slice すれば最終結果。
140#[derive(Debug, Clone, PartialEq, Eq)]
141pub struct RangePlan {
142    pub first_frame_idx: usize,
143    pub last_frame_idx_inclusive: usize,
144    pub byte_start: u64,
145    pub byte_end_exclusive: u64,
146    pub slice_start_in_combined: u64,
147    pub slice_end_in_combined: u64,
148}
149
150#[derive(Debug, Error)]
151pub enum IndexError {
152    #[error("index too short: {0} bytes")]
153    TooShort(usize),
154    #[error("bad index magic: {got:?}")]
155    BadMagic { got: [u8; 4] },
156    #[error("unsupported index version {0} (this build supports {INDEX_VERSION})")]
157    UnsupportedVersion(u32),
158    #[error("entry count {claimed} doesn't match buffer remaining {remaining}")]
159    EntryCountMismatch { claimed: u64, remaining: usize },
160}
161
162pub fn encode_index(idx: &FrameIndex) -> Bytes {
163    let mut buf = BytesMut::with_capacity(HEADER_FIXED + idx.entries.len() * ENTRY_BYTES);
164    buf.put_slice(INDEX_MAGIC);
165    buf.put_u32_le(INDEX_VERSION);
166    buf.put_u64_le(idx.entries.len() as u64);
167    buf.put_u64_le(idx.total_original_size());
168    buf.put_u64_le(idx.total_padded_size);
169    for e in &idx.entries {
170        buf.put_u64_le(e.original_offset);
171        buf.put_u64_le(e.original_size);
172        buf.put_u64_le(e.compressed_offset);
173        buf.put_u64_le(e.compressed_size);
174    }
175    buf.freeze()
176}
177
178pub fn decode_index(mut input: Bytes) -> Result<FrameIndex, IndexError> {
179    if input.len() < HEADER_FIXED {
180        return Err(IndexError::TooShort(input.len()));
181    }
182    let mut magic = [0u8; 4];
183    magic.copy_from_slice(&input[..4]);
184    if &magic != INDEX_MAGIC {
185        return Err(IndexError::BadMagic { got: magic });
186    }
187    input.advance(4);
188    let version = input.get_u32_le();
189    if version != INDEX_VERSION {
190        return Err(IndexError::UnsupportedVersion(version));
191    }
192    let n = input.get_u64_le();
193    let _total_original = input.get_u64_le();
194    let total_padded_size = input.get_u64_le();
195    let expected_remaining = (n as usize).saturating_mul(ENTRY_BYTES);
196    if input.len() != expected_remaining {
197        return Err(IndexError::EntryCountMismatch {
198            claimed: n,
199            remaining: input.len(),
200        });
201    }
202    let mut entries = Vec::with_capacity(n as usize);
203    for _ in 0..n {
204        let original_offset = input.get_u64_le();
205        let original_size = input.get_u64_le();
206        let compressed_offset = input.get_u64_le();
207        let compressed_size = input.get_u64_le();
208        entries.push(FrameIndexEntry {
209            original_offset,
210            original_size,
211            compressed_offset,
212            compressed_size,
213        });
214    }
215    Ok(FrameIndex {
216        total_padded_size,
217        entries,
218    })
219}
220
221/// Object body の bytes 全体を scan して FrameIndex を構築する。
222/// `multipart_e2e.rs` 等で full-scan path として使用。
223pub fn build_index_from_body(body: &Bytes) -> Result<FrameIndex, crate::multipart::FrameError> {
224    let mut entries = Vec::new();
225    let mut original_off: u64 = 0;
226    // FrameIter は padding を skip してしまうので、自前で位置追跡しながら parse する
227    let mut cursor = 0usize;
228    let mut iter_buf = body.clone();
229    while cursor < body.len() {
230        // padding magic を skip
231        if cursor + 4 <= body.len() && &body[cursor..cursor + 4] == crate::multipart::PADDING_MAGIC
232        {
233            // PADDING_HEADER_BYTES = 4 magic + 8 length
234            if cursor + crate::multipart::PADDING_HEADER_BYTES > body.len() {
235                break;
236            }
237            let pad_len = u64::from_le_bytes(body[cursor + 4..cursor + 12].try_into().unwrap());
238            cursor += crate::multipart::PADDING_HEADER_BYTES + pad_len as usize;
239            iter_buf = body.slice(cursor..);
240            continue;
241        }
242        // data frame
243        if cursor + crate::multipart::FRAME_HEADER_BYTES > body.len() {
244            break;
245        }
246        let (header, _payload, rest) = crate::multipart::read_frame(iter_buf.clone())?;
247        let frame_total = crate::multipart::FRAME_HEADER_BYTES + header.compressed_size as usize;
248        entries.push(FrameIndexEntry {
249            original_offset: original_off,
250            original_size: header.original_size,
251            compressed_offset: cursor as u64,
252            compressed_size: frame_total as u64,
253        });
254        original_off += header.original_size;
255        cursor += frame_total;
256        iter_buf = rest;
257    }
258    Ok(FrameIndex {
259        total_padded_size: body.len() as u64,
260        entries,
261    })
262}
263
264/// `<key>` から sidecar key を生成。
265pub fn sidecar_key(object_key: &str) -> String {
266    format!("{object_key}.s4index")
267}
268
269#[cfg(test)]
270mod tests {
271    use super::*;
272    use crate::CodecKind;
273    use crate::multipart::{FrameHeader, pad_to_minimum, write_frame};
274
275    fn sample_index() -> FrameIndex {
276        FrameIndex {
277            total_padded_size: 200,
278            entries: vec![
279                FrameIndexEntry {
280                    original_offset: 0,
281                    original_size: 100,
282                    compressed_offset: 0,
283                    compressed_size: 50,
284                },
285                FrameIndexEntry {
286                    original_offset: 100,
287                    original_size: 80,
288                    compressed_offset: 60, // gap of 10 = padding
289                    compressed_size: 40,
290                },
291                FrameIndexEntry {
292                    original_offset: 180,
293                    original_size: 50,
294                    compressed_offset: 100,
295                    compressed_size: 30,
296                },
297            ],
298        }
299    }
300
301    #[test]
302    fn encode_decode_roundtrip() {
303        let idx = sample_index();
304        let bytes = encode_index(&idx);
305        let decoded = decode_index(bytes).unwrap();
306        assert_eq!(decoded, idx);
307    }
308
309    #[test]
310    fn lookup_range_within_single_frame() {
311        let idx = sample_index();
312        // 元 byte [10, 50) は frame 0 (original 0..100) の中
313        let plan = idx.lookup_range(10, 50).unwrap();
314        assert_eq!(plan.first_frame_idx, 0);
315        assert_eq!(plan.last_frame_idx_inclusive, 0);
316        assert_eq!(plan.byte_start, 0);
317        assert_eq!(plan.byte_end_exclusive, 50); // frame 0 全体
318        assert_eq!(plan.slice_start_in_combined, 10);
319        assert_eq!(plan.slice_end_in_combined, 50);
320    }
321
322    #[test]
323    fn lookup_range_spans_frames() {
324        let idx = sample_index();
325        // [50, 150) は frame 0 後半 + frame 1 前半
326        let plan = idx.lookup_range(50, 150).unwrap();
327        assert_eq!(plan.first_frame_idx, 0);
328        assert_eq!(plan.last_frame_idx_inclusive, 1);
329        assert_eq!(plan.byte_start, 0);
330        assert_eq!(plan.byte_end_exclusive, 100); // frame 0 (0..50) + frame 1 (60..100)
331        assert_eq!(plan.slice_start_in_combined, 50);
332        assert_eq!(plan.slice_end_in_combined, 150);
333    }
334
335    #[test]
336    fn lookup_range_at_end_clamps() {
337        let idx = sample_index();
338        // total original = 100 + 80 + 50 = 230、要求 200..1000 → 200..230 にクランプ
339        let plan = idx.lookup_range(200, 1000).unwrap();
340        assert_eq!(plan.first_frame_idx, 2);
341        assert_eq!(plan.last_frame_idx_inclusive, 2);
342        // frame 2 全体 (compressed_offset=100, size=30 → byte 100..130)
343        assert_eq!(plan.byte_start, 100);
344        assert_eq!(plan.byte_end_exclusive, 130);
345    }
346
347    #[test]
348    fn lookup_range_out_of_bounds_returns_none() {
349        let idx = sample_index();
350        assert!(idx.lookup_range(500, 600).is_none());
351    }
352
353    #[test]
354    fn build_index_from_real_body_skips_padding() {
355        // 2 frame + 中間 padding を組んで、index が正しく構築されることを確認
356        let mut buf = BytesMut::new();
357        let p1 = Bytes::from_static(b"AAAA");
358        write_frame(
359            &mut buf,
360            FrameHeader {
361                codec: CodecKind::Passthrough,
362                original_size: 100,
363                compressed_size: p1.len() as u64,
364                crc32c: 0,
365            },
366            &p1,
367        );
368        let frame1_end = buf.len();
369        // pad to 5000 bytes
370        pad_to_minimum(&mut buf, 5000);
371        let pad_end = buf.len();
372        let p2 = Bytes::from_static(b"BBBB");
373        write_frame(
374            &mut buf,
375            FrameHeader {
376                codec: CodecKind::Passthrough,
377                original_size: 80,
378                compressed_size: p2.len() as u64,
379                crc32c: 0,
380            },
381            &p2,
382        );
383
384        let idx = build_index_from_body(&buf.freeze()).unwrap();
385        assert_eq!(idx.entries.len(), 2);
386        assert_eq!(idx.entries[0].original_offset, 0);
387        assert_eq!(idx.entries[0].compressed_offset, 0);
388        assert_eq!(idx.entries[0].original_size, 100);
389        assert_eq!(idx.entries[0].compressed_size, frame1_end as u64);
390        assert_eq!(idx.entries[1].original_offset, 100);
391        assert_eq!(idx.entries[1].compressed_offset, pad_end as u64);
392        assert_eq!(idx.entries[1].original_size, 80);
393        assert_eq!(idx.total_original_size(), 180);
394    }
395}