1use bytes::{Buf, BufMut, Bytes, BytesMut};
70use thiserror::Error;
71
72pub const INDEX_MAGIC: &[u8; 4] = b"S4IX";
73pub const INDEX_VERSION: u32 = 2;
77pub const INDEX_VERSION_V1: u32 = 1;
79pub const INDEX_HEADER_BYTES: usize = 4 + 4 + 8 + 8 + 4 + 4 + 8; const HEADER_FIXED_V1: usize = 4 + 4 + 8 + 8 + 8;
82const HEADER_FIXED_V2: usize = HEADER_FIXED_V1 + 8 + 4;
85pub const ENTRY_BYTES: usize = 8 + 8 + 8 + 8;
86
87#[derive(Debug, Clone, PartialEq, Eq)]
88pub struct FrameIndexEntry {
89 pub original_offset: u64,
91 pub original_size: u64,
93 pub compressed_offset: u64,
95 pub compressed_size: u64,
97}
98
99impl FrameIndexEntry {
100 pub fn original_end(&self) -> u64 {
101 self.original_offset + self.original_size
102 }
103 pub fn compressed_end(&self) -> u64 {
104 self.compressed_offset + self.compressed_size
105 }
106}
107
108#[derive(Debug, Clone, Default, PartialEq, Eq)]
109pub struct FrameIndex {
110 pub total_padded_size: u64,
112 pub entries: Vec<FrameIndexEntry>,
113 pub source_etag: Option<String>,
121 pub source_compressed_size: Option<u64>,
127}
128
129impl FrameIndex {
130 pub fn total_original_size(&self) -> u64 {
131 self.entries.last().map(|e| e.original_end()).unwrap_or(0)
132 }
133
134 pub fn lookup_range(&self, start: u64, end_exclusive: u64) -> Option<RangePlan> {
140 if self.entries.is_empty() || start >= end_exclusive {
141 return None;
142 }
143 let total = self.total_original_size();
144 if start >= total {
145 return None;
146 }
147 let clamped_end = end_exclusive.min(total);
148
149 let first_idx = match self.entries.binary_search_by(|e| {
151 if e.original_end() <= start {
152 std::cmp::Ordering::Less
153 } else if e.original_offset > start {
154 std::cmp::Ordering::Greater
155 } else {
156 std::cmp::Ordering::Equal
157 }
158 }) {
159 Ok(i) => i,
160 Err(_) => return None,
161 };
162 let last_inclusive = clamped_end - 1;
164 let last_idx = match self.entries.binary_search_by(|e| {
165 if e.original_end() <= last_inclusive {
166 std::cmp::Ordering::Less
167 } else if e.original_offset > last_inclusive {
168 std::cmp::Ordering::Greater
169 } else {
170 std::cmp::Ordering::Equal
171 }
172 }) {
173 Ok(i) => i,
174 Err(_) => return None,
175 };
176
177 let byte_start = self.entries[first_idx].compressed_offset;
178 let byte_end_exclusive = self.entries[last_idx].compressed_end();
179 Some(RangePlan {
180 first_frame_idx: first_idx,
181 last_frame_idx_inclusive: last_idx,
182 byte_start,
183 byte_end_exclusive,
184 slice_start_in_combined: start - self.entries[first_idx].original_offset,
186 slice_end_in_combined: clamped_end - self.entries[first_idx].original_offset,
187 })
188 }
189}
190
191#[derive(Debug, Clone, PartialEq, Eq)]
195pub struct RangePlan {
196 pub first_frame_idx: usize,
197 pub last_frame_idx_inclusive: usize,
198 pub byte_start: u64,
199 pub byte_end_exclusive: u64,
200 pub slice_start_in_combined: u64,
201 pub slice_end_in_combined: u64,
202}
203
204#[derive(Debug, Error)]
205pub enum IndexError {
206 #[error("index too short: {0} bytes")]
207 TooShort(usize),
208 #[error("bad index magic: {got:?}")]
209 BadMagic { got: [u8; 4] },
210 #[error("unsupported index version {0} (this build supports {INDEX_VERSION})")]
211 UnsupportedVersion(u32),
212 #[error("entry count {claimed} doesn't match buffer remaining {remaining}")]
213 EntryCountMismatch { claimed: u64, remaining: usize },
214}
215
216pub fn encode_index(idx: &FrameIndex) -> Bytes {
221 let etag_bytes = idx.source_etag.as_deref().unwrap_or("").as_bytes();
222 let mut buf = BytesMut::with_capacity(
223 HEADER_FIXED_V2 + etag_bytes.len() + idx.entries.len() * ENTRY_BYTES,
224 );
225 buf.put_slice(INDEX_MAGIC);
226 buf.put_u32_le(INDEX_VERSION);
227 buf.put_u64_le(idx.entries.len() as u64);
228 buf.put_u64_le(idx.total_original_size());
229 buf.put_u64_le(idx.total_padded_size);
230 buf.put_u64_le(idx.source_compressed_size.unwrap_or(0));
232 buf.put_u32_le(etag_bytes.len() as u32);
233 buf.put_slice(etag_bytes);
234 for e in &idx.entries {
235 buf.put_u64_le(e.original_offset);
236 buf.put_u64_le(e.original_size);
237 buf.put_u64_le(e.compressed_offset);
238 buf.put_u64_le(e.compressed_size);
239 }
240 buf.freeze()
241}
242
243#[doc(hidden)]
248pub fn encode_index_v1_for_test(idx: &FrameIndex) -> Bytes {
249 let mut buf = BytesMut::with_capacity(HEADER_FIXED_V1 + idx.entries.len() * ENTRY_BYTES);
250 buf.put_slice(INDEX_MAGIC);
251 buf.put_u32_le(INDEX_VERSION_V1);
252 buf.put_u64_le(idx.entries.len() as u64);
253 buf.put_u64_le(idx.total_original_size());
254 buf.put_u64_le(idx.total_padded_size);
255 for e in &idx.entries {
256 buf.put_u64_le(e.original_offset);
257 buf.put_u64_le(e.original_size);
258 buf.put_u64_le(e.compressed_offset);
259 buf.put_u64_le(e.compressed_size);
260 }
261 buf.freeze()
262}
263
264pub fn decode_index(mut input: Bytes) -> Result<FrameIndex, IndexError> {
265 if input.len() < HEADER_FIXED_V1 {
266 return Err(IndexError::TooShort(input.len()));
267 }
268 let mut magic = [0u8; 4];
269 magic.copy_from_slice(&input[..4]);
270 if &magic != INDEX_MAGIC {
271 return Err(IndexError::BadMagic { got: magic });
272 }
273 input.advance(4);
274 let version = input.get_u32_le();
275 let n = input.get_u64_le();
276 let _total_original = input.get_u64_le();
277 let total_padded_size = input.get_u64_le();
278 let (source_compressed_size, source_etag) = match version {
281 v if v == INDEX_VERSION_V1 => (None, None),
282 v if v == INDEX_VERSION => {
283 if input.len() < 8 + 4 {
285 return Err(IndexError::TooShort(input.len()));
286 }
287 let scs = input.get_u64_le();
288 let etag_len = input.get_u32_le() as usize;
289 if input.len() < etag_len {
290 return Err(IndexError::TooShort(input.len()));
291 }
292 let etag_bytes = input.split_to(etag_len);
296 let etag = if etag_len == 0 {
297 None
298 } else {
299 std::str::from_utf8(&etag_bytes).ok().map(str::to_owned)
300 };
301 (if scs == 0 { None } else { Some(scs) }, etag)
302 }
303 other => return Err(IndexError::UnsupportedVersion(other)),
304 };
305 let expected_remaining = (n as usize).saturating_mul(ENTRY_BYTES);
306 if input.len() != expected_remaining {
307 return Err(IndexError::EntryCountMismatch {
308 claimed: n,
309 remaining: input.len(),
310 });
311 }
312 const BOOTSTRAP_ENTRIES: usize = 4096;
327 let initial_cap = (n as usize).min(BOOTSTRAP_ENTRIES);
328 let mut entries = Vec::with_capacity(initial_cap);
329 for _ in 0..n {
330 let original_offset = input.get_u64_le();
331 let original_size = input.get_u64_le();
332 let compressed_offset = input.get_u64_le();
333 let compressed_size = input.get_u64_le();
334 entries.push(FrameIndexEntry {
335 original_offset,
336 original_size,
337 compressed_offset,
338 compressed_size,
339 });
340 }
341 Ok(FrameIndex {
342 total_padded_size,
343 entries,
344 source_etag,
345 source_compressed_size,
346 })
347}
348
349pub fn build_index_from_body(body: &Bytes) -> Result<FrameIndex, crate::multipart::FrameError> {
352 let mut entries = Vec::new();
353 let mut original_off: u64 = 0;
354 let mut cursor = 0usize;
356 let mut iter_buf = body.clone();
357 while cursor < body.len() {
358 if cursor + 4 <= body.len() && &body[cursor..cursor + 4] == crate::multipart::PADDING_MAGIC
360 {
361 if cursor + crate::multipart::PADDING_HEADER_BYTES > body.len() {
363 break;
364 }
365 let pad_len = u64::from_le_bytes(body[cursor + 4..cursor + 12].try_into().unwrap());
366 cursor += crate::multipart::PADDING_HEADER_BYTES + pad_len as usize;
367 iter_buf = body.slice(cursor..);
368 continue;
369 }
370 if cursor + crate::multipart::FRAME_HEADER_BYTES > body.len() {
372 break;
373 }
374 let (header, _payload, rest) = crate::multipart::read_frame(iter_buf.clone())?;
375 let frame_total = crate::multipart::FRAME_HEADER_BYTES + header.compressed_size as usize;
376 entries.push(FrameIndexEntry {
377 original_offset: original_off,
378 original_size: header.original_size,
379 compressed_offset: cursor as u64,
380 compressed_size: frame_total as u64,
381 });
382 original_off += header.original_size;
383 cursor += frame_total;
384 iter_buf = rest;
385 }
386 Ok(FrameIndex {
387 total_padded_size: body.len() as u64,
388 entries,
389 source_etag: None,
394 source_compressed_size: None,
395 })
396}
397
398pub fn sidecar_key(object_key: &str) -> String {
400 format!("{object_key}.s4index")
401}
402
403#[cfg(test)]
404mod tests {
405 use super::*;
406 use crate::CodecKind;
407 use crate::multipart::{FrameHeader, pad_to_minimum, write_frame};
408
409 fn sample_index() -> FrameIndex {
410 FrameIndex {
411 total_padded_size: 200,
412 entries: vec![
413 FrameIndexEntry {
414 original_offset: 0,
415 original_size: 100,
416 compressed_offset: 0,
417 compressed_size: 50,
418 },
419 FrameIndexEntry {
420 original_offset: 100,
421 original_size: 80,
422 compressed_offset: 60, compressed_size: 40,
424 },
425 FrameIndexEntry {
426 original_offset: 180,
427 original_size: 50,
428 compressed_offset: 100,
429 compressed_size: 30,
430 },
431 ],
432 source_etag: None,
436 source_compressed_size: None,
437 }
438 }
439
440 #[test]
441 fn encode_decode_roundtrip() {
442 let idx = sample_index();
443 let bytes = encode_index(&idx);
444 let decoded = decode_index(bytes).unwrap();
445 assert_eq!(decoded, idx);
446 }
447
448 #[test]
451 fn encode_decode_roundtrip_v2_with_source_binding() {
452 let mut idx = sample_index();
453 idx.source_etag = Some("\"deadbeefcafe\"".into());
454 idx.source_compressed_size = Some(987_654);
455 let bytes = encode_index(&idx);
456 assert_eq!(&bytes[..4], INDEX_MAGIC);
458 let version = u32::from_le_bytes(bytes[4..8].try_into().unwrap());
459 assert_eq!(version, INDEX_VERSION, "writer must always emit v2");
460 let decoded = decode_index(bytes).unwrap();
461 assert_eq!(decoded, idx);
462 }
463
464 #[test]
472 fn sidecar_header_back_compat_old_format_no_source_etag() {
473 let v2_idx = {
474 let mut idx = sample_index();
475 idx.source_etag = Some("\"unused\"".into());
476 idx.source_compressed_size = Some(42);
477 idx
478 };
479 let v1_bytes = encode_index_v1_for_test(&v2_idx);
484 let version = u32::from_le_bytes(v1_bytes[4..8].try_into().unwrap());
486 assert_eq!(version, INDEX_VERSION_V1);
487 let decoded = decode_index(v1_bytes).expect("v1 sidecar must still decode");
488 assert_eq!(decoded.entries, v2_idx.entries);
493 assert_eq!(decoded.total_padded_size, v2_idx.total_padded_size);
494 assert_eq!(decoded.source_etag, None);
495 assert_eq!(decoded.source_compressed_size, None);
496 }
497
498 #[test]
499 fn lookup_range_within_single_frame() {
500 let idx = sample_index();
501 let plan = idx.lookup_range(10, 50).unwrap();
503 assert_eq!(plan.first_frame_idx, 0);
504 assert_eq!(plan.last_frame_idx_inclusive, 0);
505 assert_eq!(plan.byte_start, 0);
506 assert_eq!(plan.byte_end_exclusive, 50); assert_eq!(plan.slice_start_in_combined, 10);
508 assert_eq!(plan.slice_end_in_combined, 50);
509 }
510
511 #[test]
512 fn lookup_range_spans_frames() {
513 let idx = sample_index();
514 let plan = idx.lookup_range(50, 150).unwrap();
516 assert_eq!(plan.first_frame_idx, 0);
517 assert_eq!(plan.last_frame_idx_inclusive, 1);
518 assert_eq!(plan.byte_start, 0);
519 assert_eq!(plan.byte_end_exclusive, 100); assert_eq!(plan.slice_start_in_combined, 50);
521 assert_eq!(plan.slice_end_in_combined, 150);
522 }
523
524 #[test]
525 fn lookup_range_at_end_clamps() {
526 let idx = sample_index();
527 let plan = idx.lookup_range(200, 1000).unwrap();
529 assert_eq!(plan.first_frame_idx, 2);
530 assert_eq!(plan.last_frame_idx_inclusive, 2);
531 assert_eq!(plan.byte_start, 100);
533 assert_eq!(plan.byte_end_exclusive, 130);
534 }
535
536 #[test]
537 fn lookup_range_out_of_bounds_returns_none() {
538 let idx = sample_index();
539 assert!(idx.lookup_range(500, 600).is_none());
540 }
541
542 #[test]
543 fn build_index_from_real_body_skips_padding() {
544 let mut buf = BytesMut::new();
546 let p1 = Bytes::from_static(b"AAAA");
547 write_frame(
548 &mut buf,
549 FrameHeader {
550 codec: CodecKind::Passthrough,
551 original_size: 100,
552 compressed_size: p1.len() as u64,
553 crc32c: 0,
554 },
555 &p1,
556 );
557 let frame1_end = buf.len();
558 pad_to_minimum(&mut buf, 5000);
560 let pad_end = buf.len();
561 let p2 = Bytes::from_static(b"BBBB");
562 write_frame(
563 &mut buf,
564 FrameHeader {
565 codec: CodecKind::Passthrough,
566 original_size: 80,
567 compressed_size: p2.len() as u64,
568 crc32c: 0,
569 },
570 &p2,
571 );
572
573 let idx = build_index_from_body(&buf.freeze()).unwrap();
574 assert_eq!(idx.entries.len(), 2);
575 assert_eq!(idx.entries[0].original_offset, 0);
576 assert_eq!(idx.entries[0].compressed_offset, 0);
577 assert_eq!(idx.entries[0].original_size, 100);
578 assert_eq!(idx.entries[0].compressed_size, frame1_end as u64);
579 assert_eq!(idx.entries[1].original_offset, 100);
580 assert_eq!(idx.entries[1].compressed_offset, pad_end as u64);
581 assert_eq!(idx.entries[1].original_size, 80);
582 assert_eq!(idx.total_original_size(), 180);
583 }
584}