1use bytes::{Buf, BufMut, Bytes, BytesMut};
70use thiserror::Error;
71
72pub const INDEX_MAGIC: &[u8; 4] = b"S4IX";
73pub const INDEX_VERSION: u32 = 2;
77pub const INDEX_VERSION_V1: u32 = 1;
79pub const INDEX_HEADER_BYTES: usize = 4 + 4 + 8 + 8 + 4 + 4 + 8; const HEADER_FIXED_V1: usize = 4 + 4 + 8 + 8 + 8;
82const HEADER_FIXED_V2: usize = HEADER_FIXED_V1 + 8 + 4;
85pub const ENTRY_BYTES: usize = 8 + 8 + 8 + 8;
86
87#[derive(Debug, Clone, PartialEq, Eq)]
88pub struct FrameIndexEntry {
89 pub original_offset: u64,
91 pub original_size: u64,
93 pub compressed_offset: u64,
95 pub compressed_size: u64,
97}
98
99impl FrameIndexEntry {
100 pub fn original_end(&self) -> u64 {
101 self.original_offset + self.original_size
102 }
103 pub fn compressed_end(&self) -> u64 {
104 self.compressed_offset + self.compressed_size
105 }
106}
107
108#[derive(Debug, Clone, Default, PartialEq, Eq)]
109pub struct FrameIndex {
110 pub total_padded_size: u64,
112 pub entries: Vec<FrameIndexEntry>,
113 pub source_etag: Option<String>,
121 pub source_compressed_size: Option<u64>,
127}
128
129impl FrameIndex {
130 pub fn total_original_size(&self) -> u64 {
131 self.entries.last().map(|e| e.original_end()).unwrap_or(0)
132 }
133
134 pub fn lookup_range(&self, start: u64, end_exclusive: u64) -> Option<RangePlan> {
140 if self.entries.is_empty() || start >= end_exclusive {
141 return None;
142 }
143 let total = self.total_original_size();
144 if start >= total {
145 return None;
146 }
147 let clamped_end = end_exclusive.min(total);
148
149 let first_idx = match self.entries.binary_search_by(|e| {
151 if e.original_end() <= start {
152 std::cmp::Ordering::Less
153 } else if e.original_offset > start {
154 std::cmp::Ordering::Greater
155 } else {
156 std::cmp::Ordering::Equal
157 }
158 }) {
159 Ok(i) => i,
160 Err(_) => return None,
161 };
162 let last_inclusive = clamped_end - 1;
164 let last_idx = match self.entries.binary_search_by(|e| {
165 if e.original_end() <= last_inclusive {
166 std::cmp::Ordering::Less
167 } else if e.original_offset > last_inclusive {
168 std::cmp::Ordering::Greater
169 } else {
170 std::cmp::Ordering::Equal
171 }
172 }) {
173 Ok(i) => i,
174 Err(_) => return None,
175 };
176
177 let byte_start = self.entries[first_idx].compressed_offset;
178 let byte_end_exclusive = self.entries[last_idx].compressed_end();
179 Some(RangePlan {
180 first_frame_idx: first_idx,
181 last_frame_idx_inclusive: last_idx,
182 byte_start,
183 byte_end_exclusive,
184 slice_start_in_combined: start - self.entries[first_idx].original_offset,
186 slice_end_in_combined: clamped_end - self.entries[first_idx].original_offset,
187 })
188 }
189}
190
191#[derive(Debug, Clone, PartialEq, Eq)]
195pub struct RangePlan {
196 pub first_frame_idx: usize,
197 pub last_frame_idx_inclusive: usize,
198 pub byte_start: u64,
199 pub byte_end_exclusive: u64,
200 pub slice_start_in_combined: u64,
201 pub slice_end_in_combined: u64,
202}
203
204#[derive(Debug, Error)]
205pub enum IndexError {
206 #[error("index too short: {0} bytes")]
207 TooShort(usize),
208 #[error("bad index magic: {got:?}")]
209 BadMagic { got: [u8; 4] },
210 #[error("unsupported index version {0} (this build supports {INDEX_VERSION})")]
211 UnsupportedVersion(u32),
212 #[error("entry count {claimed} doesn't match buffer remaining {remaining}")]
213 EntryCountMismatch { claimed: u64, remaining: usize },
214}
215
216pub fn encode_index(idx: &FrameIndex) -> Bytes {
221 let etag_bytes = idx.source_etag.as_deref().unwrap_or("").as_bytes();
222 let mut buf = BytesMut::with_capacity(
223 HEADER_FIXED_V2 + etag_bytes.len() + idx.entries.len() * ENTRY_BYTES,
224 );
225 buf.put_slice(INDEX_MAGIC);
226 buf.put_u32_le(INDEX_VERSION);
227 buf.put_u64_le(idx.entries.len() as u64);
228 buf.put_u64_le(idx.total_original_size());
229 buf.put_u64_le(idx.total_padded_size);
230 buf.put_u64_le(idx.source_compressed_size.unwrap_or(0));
232 buf.put_u32_le(etag_bytes.len() as u32);
233 buf.put_slice(etag_bytes);
234 for e in &idx.entries {
235 buf.put_u64_le(e.original_offset);
236 buf.put_u64_le(e.original_size);
237 buf.put_u64_le(e.compressed_offset);
238 buf.put_u64_le(e.compressed_size);
239 }
240 buf.freeze()
241}
242
243#[doc(hidden)]
248pub fn encode_index_v1_for_test(idx: &FrameIndex) -> Bytes {
249 let mut buf = BytesMut::with_capacity(HEADER_FIXED_V1 + idx.entries.len() * ENTRY_BYTES);
250 buf.put_slice(INDEX_MAGIC);
251 buf.put_u32_le(INDEX_VERSION_V1);
252 buf.put_u64_le(idx.entries.len() as u64);
253 buf.put_u64_le(idx.total_original_size());
254 buf.put_u64_le(idx.total_padded_size);
255 for e in &idx.entries {
256 buf.put_u64_le(e.original_offset);
257 buf.put_u64_le(e.original_size);
258 buf.put_u64_le(e.compressed_offset);
259 buf.put_u64_le(e.compressed_size);
260 }
261 buf.freeze()
262}
263
264pub fn decode_index(mut input: Bytes) -> Result<FrameIndex, IndexError> {
265 if input.len() < HEADER_FIXED_V1 {
266 return Err(IndexError::TooShort(input.len()));
267 }
268 let mut magic = [0u8; 4];
269 magic.copy_from_slice(&input[..4]);
270 if &magic != INDEX_MAGIC {
271 return Err(IndexError::BadMagic { got: magic });
272 }
273 input.advance(4);
274 let version = input.get_u32_le();
275 let n = input.get_u64_le();
276 let _total_original = input.get_u64_le();
277 let total_padded_size = input.get_u64_le();
278 let (source_compressed_size, source_etag) = match version {
281 v if v == INDEX_VERSION_V1 => (None, None),
282 v if v == INDEX_VERSION => {
283 if input.len() < 8 + 4 {
285 return Err(IndexError::TooShort(input.len()));
286 }
287 let scs = input.get_u64_le();
288 let etag_len = input.get_u32_le() as usize;
289 if input.len() < etag_len {
290 return Err(IndexError::TooShort(input.len()));
291 }
292 let etag_bytes = input.split_to(etag_len);
296 let etag = if etag_len == 0 {
297 None
298 } else {
299 std::str::from_utf8(&etag_bytes).ok().map(str::to_owned)
300 };
301 (if scs == 0 { None } else { Some(scs) }, etag)
302 }
303 other => return Err(IndexError::UnsupportedVersion(other)),
304 };
305 let expected_remaining = (n as usize).saturating_mul(ENTRY_BYTES);
306 if input.len() != expected_remaining {
307 return Err(IndexError::EntryCountMismatch {
308 claimed: n,
309 remaining: input.len(),
310 });
311 }
312 let mut entries = Vec::with_capacity(n as usize);
313 for _ in 0..n {
314 let original_offset = input.get_u64_le();
315 let original_size = input.get_u64_le();
316 let compressed_offset = input.get_u64_le();
317 let compressed_size = input.get_u64_le();
318 entries.push(FrameIndexEntry {
319 original_offset,
320 original_size,
321 compressed_offset,
322 compressed_size,
323 });
324 }
325 Ok(FrameIndex {
326 total_padded_size,
327 entries,
328 source_etag,
329 source_compressed_size,
330 })
331}
332
333pub fn build_index_from_body(body: &Bytes) -> Result<FrameIndex, crate::multipart::FrameError> {
336 let mut entries = Vec::new();
337 let mut original_off: u64 = 0;
338 let mut cursor = 0usize;
340 let mut iter_buf = body.clone();
341 while cursor < body.len() {
342 if cursor + 4 <= body.len() && &body[cursor..cursor + 4] == crate::multipart::PADDING_MAGIC
344 {
345 if cursor + crate::multipart::PADDING_HEADER_BYTES > body.len() {
347 break;
348 }
349 let pad_len = u64::from_le_bytes(body[cursor + 4..cursor + 12].try_into().unwrap());
350 cursor += crate::multipart::PADDING_HEADER_BYTES + pad_len as usize;
351 iter_buf = body.slice(cursor..);
352 continue;
353 }
354 if cursor + crate::multipart::FRAME_HEADER_BYTES > body.len() {
356 break;
357 }
358 let (header, _payload, rest) = crate::multipart::read_frame(iter_buf.clone())?;
359 let frame_total = crate::multipart::FRAME_HEADER_BYTES + header.compressed_size as usize;
360 entries.push(FrameIndexEntry {
361 original_offset: original_off,
362 original_size: header.original_size,
363 compressed_offset: cursor as u64,
364 compressed_size: frame_total as u64,
365 });
366 original_off += header.original_size;
367 cursor += frame_total;
368 iter_buf = rest;
369 }
370 Ok(FrameIndex {
371 total_padded_size: body.len() as u64,
372 entries,
373 source_etag: None,
378 source_compressed_size: None,
379 })
380}
381
382pub fn sidecar_key(object_key: &str) -> String {
384 format!("{object_key}.s4index")
385}
386
387#[cfg(test)]
388mod tests {
389 use super::*;
390 use crate::CodecKind;
391 use crate::multipart::{FrameHeader, pad_to_minimum, write_frame};
392
393 fn sample_index() -> FrameIndex {
394 FrameIndex {
395 total_padded_size: 200,
396 entries: vec![
397 FrameIndexEntry {
398 original_offset: 0,
399 original_size: 100,
400 compressed_offset: 0,
401 compressed_size: 50,
402 },
403 FrameIndexEntry {
404 original_offset: 100,
405 original_size: 80,
406 compressed_offset: 60, compressed_size: 40,
408 },
409 FrameIndexEntry {
410 original_offset: 180,
411 original_size: 50,
412 compressed_offset: 100,
413 compressed_size: 30,
414 },
415 ],
416 source_etag: None,
420 source_compressed_size: None,
421 }
422 }
423
424 #[test]
425 fn encode_decode_roundtrip() {
426 let idx = sample_index();
427 let bytes = encode_index(&idx);
428 let decoded = decode_index(bytes).unwrap();
429 assert_eq!(decoded, idx);
430 }
431
432 #[test]
435 fn encode_decode_roundtrip_v2_with_source_binding() {
436 let mut idx = sample_index();
437 idx.source_etag = Some("\"deadbeefcafe\"".into());
438 idx.source_compressed_size = Some(987_654);
439 let bytes = encode_index(&idx);
440 assert_eq!(&bytes[..4], INDEX_MAGIC);
442 let version = u32::from_le_bytes(bytes[4..8].try_into().unwrap());
443 assert_eq!(version, INDEX_VERSION, "writer must always emit v2");
444 let decoded = decode_index(bytes).unwrap();
445 assert_eq!(decoded, idx);
446 }
447
448 #[test]
456 fn sidecar_header_back_compat_old_format_no_source_etag() {
457 let v2_idx = {
458 let mut idx = sample_index();
459 idx.source_etag = Some("\"unused\"".into());
460 idx.source_compressed_size = Some(42);
461 idx
462 };
463 let v1_bytes = encode_index_v1_for_test(&v2_idx);
468 let version = u32::from_le_bytes(v1_bytes[4..8].try_into().unwrap());
470 assert_eq!(version, INDEX_VERSION_V1);
471 let decoded = decode_index(v1_bytes).expect("v1 sidecar must still decode");
472 assert_eq!(decoded.entries, v2_idx.entries);
477 assert_eq!(decoded.total_padded_size, v2_idx.total_padded_size);
478 assert_eq!(decoded.source_etag, None);
479 assert_eq!(decoded.source_compressed_size, None);
480 }
481
482 #[test]
483 fn lookup_range_within_single_frame() {
484 let idx = sample_index();
485 let plan = idx.lookup_range(10, 50).unwrap();
487 assert_eq!(plan.first_frame_idx, 0);
488 assert_eq!(plan.last_frame_idx_inclusive, 0);
489 assert_eq!(plan.byte_start, 0);
490 assert_eq!(plan.byte_end_exclusive, 50); assert_eq!(plan.slice_start_in_combined, 10);
492 assert_eq!(plan.slice_end_in_combined, 50);
493 }
494
495 #[test]
496 fn lookup_range_spans_frames() {
497 let idx = sample_index();
498 let plan = idx.lookup_range(50, 150).unwrap();
500 assert_eq!(plan.first_frame_idx, 0);
501 assert_eq!(plan.last_frame_idx_inclusive, 1);
502 assert_eq!(plan.byte_start, 0);
503 assert_eq!(plan.byte_end_exclusive, 100); assert_eq!(plan.slice_start_in_combined, 50);
505 assert_eq!(plan.slice_end_in_combined, 150);
506 }
507
508 #[test]
509 fn lookup_range_at_end_clamps() {
510 let idx = sample_index();
511 let plan = idx.lookup_range(200, 1000).unwrap();
513 assert_eq!(plan.first_frame_idx, 2);
514 assert_eq!(plan.last_frame_idx_inclusive, 2);
515 assert_eq!(plan.byte_start, 100);
517 assert_eq!(plan.byte_end_exclusive, 130);
518 }
519
520 #[test]
521 fn lookup_range_out_of_bounds_returns_none() {
522 let idx = sample_index();
523 assert!(idx.lookup_range(500, 600).is_none());
524 }
525
526 #[test]
527 fn build_index_from_real_body_skips_padding() {
528 let mut buf = BytesMut::new();
530 let p1 = Bytes::from_static(b"AAAA");
531 write_frame(
532 &mut buf,
533 FrameHeader {
534 codec: CodecKind::Passthrough,
535 original_size: 100,
536 compressed_size: p1.len() as u64,
537 crc32c: 0,
538 },
539 &p1,
540 );
541 let frame1_end = buf.len();
542 pad_to_minimum(&mut buf, 5000);
544 let pad_end = buf.len();
545 let p2 = Bytes::from_static(b"BBBB");
546 write_frame(
547 &mut buf,
548 FrameHeader {
549 codec: CodecKind::Passthrough,
550 original_size: 80,
551 compressed_size: p2.len() as u64,
552 crc32c: 0,
553 },
554 &p2,
555 );
556
557 let idx = build_index_from_body(&buf.freeze()).unwrap();
558 assert_eq!(idx.entries.len(), 2);
559 assert_eq!(idx.entries[0].original_offset, 0);
560 assert_eq!(idx.entries[0].compressed_offset, 0);
561 assert_eq!(idx.entries[0].original_size, 100);
562 assert_eq!(idx.entries[0].compressed_size, frame1_end as u64);
563 assert_eq!(idx.entries[1].original_offset, 100);
564 assert_eq!(idx.entries[1].compressed_offset, pad_end as u64);
565 assert_eq!(idx.entries[1].original_size, 80);
566 assert_eq!(idx.total_original_size(), 180);
567 }
568}